Hey everyone,
Here is a small blog post regarding Kafka experiments and “foundations” I’ve started working on. I will also take the opportunity to thank Maxence S. (https://twitter.com/maxenceschmitt) for his precious help on this topic.
Instanciating the Jungle!
ZooKeeper and Kafka
First off, I instanciated a Kafka (and ZooKeeper) instance
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Then, I basically only had to call some docker-compose
-fu and.. the job was pretty much done.
✗ docker-compose up kafka zookeeper
Starting zookeeper_1 ... done
Starting kafka_1 ... done
Attaching to zookeeper_1, kafka_1
zookeeper_1 | JMX enabled by default
kafka_1 | Excluding KAFKA_HOME from broker config
kafka_1 | [Configuring] 'port' in '/opt/kafka/config/server.properties'
zookeeper_1 | Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
kafka_1 | [Configuring] 'advertised.listeners' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
kafka_1 | Excluding KAFKA_VERSION from broker config
kafka_1 | [Configuring] 'listeners' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
zookeeper_1 | 2020-02-05 19:48:38,585 [myid:] - INFO [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
zookeeper_1 | 2020-02-05 19:48:38,597 [myid:] - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
zookeeper_1 | 2020-02-05 19:48:38,598 [myid:] - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
zookeeper_1 | 2020-02-05 19:48:38,601 [myid:] - WARN [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running in standalone mode
zookeeper_1 | 2020-02-05 19:48:38,609 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
zookeeper_1 | 2020-02-05 19:48:38,649 [myid:] - INFO [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
zookeeper_1 | 2020-02-05 19:48:38,651 [myid:] - INFO [main:ZooKeeperServerMain@95] - Starting server
zookeeper_1 | 2020-02-05 19:48:38,661 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
[... truncated ...]
Producing the message…
The next part was simple, I just wanted to programmatically
create messages and I decided to choose Python to give it a shot. I used the kafka-python
library and was pretty straight forward.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='0.0.0.0:9092')
producer.send('test', b'Hello, World!')
producer.send('test', key=b'message-two', value=b'This is Kafka-Python')
producer.flush()
print('We are done sending messages.')
This will basically send two messages on a specifc topic called test
which are:
- “Hello, World!”
- and “This is Kafka-Python”
the line procuder.flush()
is pretty much important, otherwise, messages will not be sent.. and you will wait a lot. Desperatly (trust me :)).
…and consuming it!
In order to consume them, the principle was pretty much similar, using the same Python library as the producer example.
The code snippet is as follow:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', auto_offset_reset='earliest', enable_auto_commit=True, group_id='booya', bootstrap_servers='0.0.0.0:9092')
for message in consumer:
print (message)
It will basically wait for messages to be send on the test
topic.
The other arguments are:
auto_offset_reset='earliest'
, starts from the earliest message that it didn’t consume (yet!)enable_auto_commit=True
, committing the processing of message(s) automatically.group_id='consumer'
, thegroup
name you want to give (eg.parser
,consumer
,extractor
, …)bootstrap_servers='0.0.0.0:9092'
< basically the Kafka server it wants to communicate with.
Dockerizing consumers and producers
This was one of the easiest part and done with a simple Dockerfile
as follow:
FROM python:3.7
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "producer.py"]
The only tweak I had to do for the consumer
one was to add another parameter to python
like this, in order to view logs through docker-compose
command:
FROM python:3.7
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["python", "-u", "consumer.py"]
Combining them all!
My final docker-compose.yml
looked like this:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
producer:
build: ./producer/
network_mode: host
consumer:
build: ./consumer/
network_mode: host
Just to make sure I was able to access Kafka, I had to specify the network_mode: host
so that containers could access it.
… And testing it!
We already started kafka
and zookeeper
containers.
Let’s now start our comsumer
one like this:
docker-compose up --build consumer
Building consumer
Step 1/5 : FROM python:3.7
---> 894300ec3929
Step 2/5 : COPY . /app
---> e0f72c35d0cc
Step 3/5 : WORKDIR /app
---> Running in c81e60950ca5
[... truncated ...]
Step 5/5 : CMD ["python", "-u", "consumer.py"]
---> Running in 6933bac99474
Removing intermediate container 6933bac99474
---> 69a663dc20fd
Successfully built 69a663dc20fd
Successfully tagged consumer:latest
Recreating consumer_1 ... done
Attaching to consumer_1
Our container will loop infinitely until it receives messages to display (aka. process) them.
We can now start the producer
one:
✗ docker-compose up --build producer
Building producer
Step 1/5 : FROM python:3.7
---> 894300ec3929
Step 2/5 : COPY . /app
---> 726ccd261c92
Step 3/5 : WORKDIR /app
---> Running in a59f0fedaf46
[... truncated ...]
Successfully tagged producer:latest
Recreating producer_1 ... done
Attaching to producer_1
producer_1 | We are done sending messages.
producer_1 exited with code 0
And… if you go back in your consumer
window, magic should have happened (!):
Attaching to consumer_1
consumer_1 | <kafka.consumer.group.KafkaConsumer object at 0x7fb2f88a70d0>
consumer_1 | ConsumerRecord(topic='test', partition=0, offset=36, timestamp=1580931711739, timestamp_type=0, key=None, value=b'Hello, World!', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=13, serialized_header_size=-1)
consumer_1 | ConsumerRecord(topic='test', partition=0, offset=37, timestamp=1580931711739, timestamp_type=0, key=b'message-two', value=b'This is Kafka-Python', headers=[], checksum=None, serialized_key_size=11, serialized_value_size=20, serialized_header_size=-1)
Conclusion
This was a pretty simple introduction to Kafka and start doing some processing with it. Again, thanks again Maxence for your precious advices! I added couple of external links for documentation and obviously the source code if you want to start experimenting.
External Link
If you’d like to play out, I created a repository with all those files so that will he handier instead of copying/pasting the code.
Code is availe at https://github.com/PaulSec/kafka-boilerplate-docker
Kafka-python library documentation is available here: https://kafka-python.readthedocs.io/en/master/usage.html
Deploying Kafka broker with Docker was also a precious resource : https://www.kaaproject.org/kafka-docker