Getting started with Apache Kafka and Python

28/12/2020
Chưa phân loại
In this lesson, we will see how we can use Apache Kafka with Python and make a sample application using the Python client for Apache Kafka.

To complete this lesson, you must have an active installation for Kafka on your machine. Read Install Apache Kafka on Ubuntu to know how to do this.

Installing Python client for Apache Kafka

Before we can start working with Apache Kafka in Python program, we need to install the Python client for Apache Kafka. This can be done using pip (Python package Index). Here is a command to achieve this:

pip3 install kafka-python

This will be a quick installation on the terminal:

Python Kafka Client Installation using PIP

Now that we have an active installation for Apache Kafka and we have also installed the Python Kafka client, we’re ready to start coding.

Making a Producer

The first thing to have to publish messages on Kafka is a producer application which can send messages to topics in Kafka.

Note that Kafka producers are asynchronous message producers. This means that the operations done while a message is published on Kafka Topic partition are non-blocking. To keep things simple, we will write simple JSON publisher for this lesson.

To start, make an instance for the Kafka Producer:

from kafka import KafkaProducer
import json
import pprint

producer = KafkaProducer(
    bootstrap_servers=‘localhost:9092’,
    value_serializer=lambda v: json.dumps(v).encode(‘utf-8’))

The bootstrap_servers attribute informs about the host & port for the Kafka server. The value_serializer attribute is just for the purpose of JSON serialization of JSON Values encountered.

To play with the Kafka Producer, let’s try printing the metrics related to the Producer and Kafka cluster:

metrics = producer.metrics()
pprint.pprint(metrics)

We will see the following out now:

Kafka Mterics

Now, let’s finally try sending some message to the Kafka Queue. A simple JSON Object will be a good example:

producer.send(‘linuxhint’, {‘topic’: ‘kafka’})

The linuxhint is the topic partition on which the JSON Object will be sent on. When you run the script, you won’t get any output as the message is just sent to the topic partition. It’s time to write a consumer so that we can test our application.

Making a Consumer

Now, we’re ready to make a new connection as a Consumer application and getting the messages from the Kafka Topic. Start with making a new instance for the Consumer:

from kafka import KafkaConsumer
from kafka import TopicPartition

print(‘Making connection.’)
consumer = KafkaConsumer(bootstrap_servers=‘localhost:9092’)

Now, assign a topic to this connection and a possible offset value as well.

print(‘Assigning Topic.’)
consumer.assign([TopicPartition(‘linuxhint’, 2)])

Finally, we’re ready to print the mssage:

print(‘Getting message.’)
for message in consumer:
    print("OFFSET: " + str(message[0])+ "t MSG: " + str(message))

Through this, we will get a list of all published messages on the Kafka Consumer Topic Partition. The output for this program will be:

Kafka Consumer

Just for a quick reference, here is the complete Producer script:

from kafka import KafkaProducer
import json
import pprint

producer = KafkaProducer(
    bootstrap_servers=‘localhost:9092’,
    value_serializer=lambda v: json.dumps(v).encode(‘utf-8’))

producer.send(‘linuxhint’, {‘topic’: ‘kafka’})

# metrics = producer.metrics()
# pprint.pprint(metrics)

And here is the complete Consumer program we used:

from kafka import KafkaConsumer
from kafka import TopicPartition

print(‘Making connection.’)
consumer = KafkaConsumer(bootstrap_servers=‘localhost:9092’)

print(‘Assigning Topic.’)
consumer.assign([TopicPartition(‘linuxhint’, 2)])

print(‘Getting message.’)
for message in consumer:
    print("OFFSET: " + str(message[0])+ "t MSG: " + str(message))

Conclusion

In this lesson, we looked at how we can install and start using Apache Kafka in our Python programs. We showed how easy it is to perform simple tasks related to Kafka in Python with the demonstrated Kafka Client for Python.

ONET IDC thành lập vào năm 2012, là công ty chuyên nghiệp tại Việt Nam trong lĩnh vực cung cấp dịch vụ Hosting, VPS, máy chủ vật lý, dịch vụ Firewall Anti DDoS, SSL… Với 10 năm xây dựng và phát triển, ứng dụng nhiều công nghệ hiện đại, ONET IDC đã giúp hàng ngàn khách hàng tin tưởng lựa chọn, mang lại sự ổn định tuyệt đối cho website của khách hàng để thúc đẩy việc kinh doanh đạt được hiệu quả và thành công.
Bài viết liên quan

Best Video Downloaders for Firefox

Top 10 Video Downloaders for Firefox Even if there are many online media streaming websites like YouTube and DailyMotion...
28/12/2020

How to install Krita 3.1.4 on Ubuntu 17.04, Ubuntu 16.04, Linux Mint

Krita 3.1.4 recently released, is strictly a bugfix release that addresses a few crash reported issues. Krita as we know...
28/12/2020

RHEL 7.5 Release Date

Red Hat Enterprise Linux (RHEL) 7.5 Release and Enhancements Even though Red Hat doesn’t disclose future release schedules,...
12/02/2020
Bài Viết

Bài Viết Mới Cập Nhật

Hướng dẫn fake ip bằng phần mềm SStap
10/06/2025

VPS treo game là gì? Thuê VPS treo game giá rẻ, không lo giật lag
02/06/2025

 BitBrowser – Best Anti-Detect Browser!
26/05/2025

Dịch Vụ Xây Dựng Hệ Thống Peering Với Internet Exchange (IXP)
04/04/2025

Dịch Vụ Triển Khai VPN Site-to-Site & Remote Access
04/04/2025