Kafka Experiments

Hey everyone,

Here is a small blog post regarding Kafka experiments and “foundations” I’ve started working on. I will also take the opportunity to thank Maxence S. (https://twitter.com/maxenceschmitt) for his precious help on this topic.

Instanciating the Jungle!

ZooKeeper and Kafka

First off, I instanciated a Kafka (and ZooKeeper) instance

version: '2'

services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Then, I basically only had to call some docker-compose-fu and.. the job was pretty much done.

✗ docker-compose up kafka zookeeper
Starting zookeeper_1 ... done
Starting kafka_1     ... done
Attaching to zookeeper_1, kafka_1
zookeeper_1  | JMX enabled by default
kafka_1      | Excluding KAFKA_HOME from broker config
kafka_1      | [Configuring] 'port' in '/opt/kafka/config/server.properties'
zookeeper_1  | Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
kafka_1      | [Configuring] 'advertised.listeners' in '/opt/kafka/config/server.properties'
kafka_1      | [Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
kafka_1      | Excluding KAFKA_VERSION from broker config
kafka_1      | [Configuring] 'listeners' in '/opt/kafka/config/server.properties'
kafka_1      | [Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
kafka_1      | [Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
zookeeper_1  | 2020-02-05 19:48:38,585 [myid:] - INFO  [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
zookeeper_1  | 2020-02-05 19:48:38,597 [myid:] - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
zookeeper_1  | 2020-02-05 19:48:38,598 [myid:] - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
zookeeper_1  | 2020-02-05 19:48:38,601 [myid:] - WARN  [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running  in standalone mode
zookeeper_1  | 2020-02-05 19:48:38,609 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
zookeeper_1  | 2020-02-05 19:48:38,649 [myid:] - INFO  [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
zookeeper_1  | 2020-02-05 19:48:38,651 [myid:] - INFO  [main:ZooKeeperServerMain@95] - Starting server
zookeeper_1  | 2020-02-05 19:48:38,661 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
[... truncated ...]

Producing the message…

The next part was simple, I just wanted to programmatically create messages and I decided to choose Python to give it a shot. I used the kafka-python library and was pretty straight forward.

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='0.0.0.0:9092')
producer.send('test', b'Hello, World!')
producer.send('test', key=b'message-two', value=b'This is Kafka-Python')
producer.flush()
print('We are done sending messages.')

This will basically send two messages on a specifc topic called test which are:

  • “Hello, World!”
  • and “This is Kafka-Python”

the line procuder.flush() is pretty much important, otherwise, messages will not be sent.. and you will wait a lot. Desperatly (trust me :)).

…and consuming it!

In order to consume them, the principle was pretty much similar, using the same Python library as the producer example.

The code snippet is as follow:

from kafka import KafkaConsumer
consumer = KafkaConsumer('test', auto_offset_reset='earliest', enable_auto_commit=True, group_id='booya', bootstrap_servers='0.0.0.0:9092')
for message in consumer:
    print (message)

It will basically wait for messages to be send on the test topic. The other arguments are:

  • auto_offset_reset='earliest', starts from the earliest message that it didn’t consume (yet!)
  • enable_auto_commit=True, committing the processing of message(s) automatically.
  • group_id='consumer', the group name you want to give (eg. parser, consumer, extractor, …)
  • bootstrap_servers='0.0.0.0:9092' < basically the Kafka server it wants to communicate with.

Dockerizing consumers and producers

This was one of the easiest part and done with a simple Dockerfile as follow:

FROM python:3.7
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "producer.py"]

The only tweak I had to do for the consumer one was to add another parameter to python like this, in order to view logs through docker-compose command:

FROM python:3.7
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "-u", "consumer.py"]

Combining them all!

My final docker-compose.yml looked like this:

version: '2'

services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  producer:
    build: ./producer/
    network_mode: host
  consumer:
    build: ./consumer/
    network_mode: host

Just to make sure I was able to access Kafka, I had to specify the network_mode: host so that containers could access it.

… And testing it!

We already started kafka and zookeeper containers. Let’s now start our comsumer one like this:

docker-compose up --build consumer
Building consumer
Step 1/5 : FROM python:3.7
 ---> 894300ec3929
Step 2/5 : COPY . /app
 ---> e0f72c35d0cc
Step 3/5 : WORKDIR /app
 ---> Running in c81e60950ca5
[... truncated ...]
Step 5/5 : CMD ["python", "-u", "consumer.py"]
 ---> Running in 6933bac99474
Removing intermediate container 6933bac99474
 ---> 69a663dc20fd
Successfully built 69a663dc20fd
Successfully tagged consumer:latest
Recreating consumer_1 ... done
Attaching to consumer_1

Our container will loop infinitely until it receives messages to display (aka. process) them.

We can now start the producer one:

✗ docker-compose up --build producer
Building producer
Step 1/5 : FROM python:3.7
 ---> 894300ec3929
Step 2/5 : COPY . /app
 ---> 726ccd261c92
Step 3/5 : WORKDIR /app
 ---> Running in a59f0fedaf46
 [... truncated ...]
Successfully tagged producer:latest
Recreating producer_1 ... done
Attaching to producer_1
producer_1   | We are done sending messages.
producer_1 exited with code 0

And… if you go back in your consumer window, magic should have happened (!):

Attaching to consumer_1
consumer_1   | <kafka.consumer.group.KafkaConsumer object at 0x7fb2f88a70d0>
consumer_1   | ConsumerRecord(topic='test', partition=0, offset=36, timestamp=1580931711739, timestamp_type=0, key=None, value=b'Hello, World!', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=13, serialized_header_size=-1)
consumer_1   | ConsumerRecord(topic='test', partition=0, offset=37, timestamp=1580931711739, timestamp_type=0, key=b'message-two', value=b'This is Kafka-Python', headers=[], checksum=None, serialized_key_size=11, serialized_value_size=20, serialized_header_size=-1)

Conclusion

This was a pretty simple introduction to Kafka and start doing some processing with it. Again, thanks again Maxence for your precious advices! I added couple of external links for documentation and obviously the source code if you want to start experimenting.

If you’d like to play out, I created a repository with all those files so that will he handier instead of copying/pasting the code.

Code is availe at https://github.com/PaulSec/kafka-boilerplate-docker

Kafka-python library documentation is available here: https://kafka-python.readthedocs.io/en/master/usage.html

Deploying Kafka broker with Docker was also a precious resource : https://www.kaaproject.org/kafka-docker