FOR DEVELOPERS

How to Implement DLQ in Kafka Using Python

How to Implement DLQ in Kafka using Python

Apache Kafka is a valuable open-source software that is used to store, read, and evaluate streaming data in real-time. It is widely adopted by businesses owing to its scalable and high-throughput infrastructure which can be used to store, analyze, and reprocess streaming data. In this article, we will cover dead letter queue or DLQ in Kafka, the architecture of Kafka, and a lot more.

What is Kafka?

Kafka is among the most popular open-source frameworks capable of reading, storing, and analyzing streaming data in real-time. Since it is open-source, it is free of cost and has many developers who contribute towards adding new features and providing support to new users.

Streaming data can be defined as the data produced by many data sources with the data record sent simultaneously. The job of the streaming platform is crucial as it has to handle the incoming data simultaneously and process it sequentially.

The following are the functions of Kafka:

  1. Publishing and subscribing to streams of records.
  2. Effectively storing streams of records in the order in which they were generated.
  3. Processing streams of records in real-time.

Why would you use Kafka?

Kafka is used to build real-time streaming data pipelines and streaming applications. A data pipeline is responsible for processing and moving data from one system to another, while a streaming pipeline can be defined as an application that consumes the stream of data received from data sources. However, Kafka is also used as a message broker, helping communicate and mediate between two applications.

How does Kafka work?

Kafka is a combination of two messaging models: queuing and publish-subscribe. It provides the key benefits of each of these models to consumers. Queuing model provides the data processing to be distributed across consumer instances to make it highly scalable. However, the problem with traditional queues is that they don’t allow multi-subscribers. The publish-subscriber is a multi-subscriber model but it can’t be used to distribute work across multiple worker processes as every message goes to every subscriber.

To combine these two solutions, Kafka uses a partition log model. A log is the ordered sequence of records. They are divided into partitions corresponding to different subscribers. Thus, Kafka allows scalability and multiple applications to fetch data from the data streams and work independently at their own pace.

Queuing model.webp

Image source: AWS

Publish-subscribe model.webp

Image source: AWS

Benefits of Kafka

There are three major advantages of using Kafka:

1. Scalability: Kafka uses a log partition model that distributes the data across multiple servers which makes it highly scalable compared to a single server.

2. Fast: It is extremely fast as it decouples the data streams which results in lower latency.

3. Durability: It distributes partitions across many servers and all the data is written to disks. Thus, it protects against system failure which makes it durable and fault tolerant.

Understanding DLQ in Kafka

The dead letter queue is defined as a queue to which messages are sent if they fail to reach the correct destination. It is also referred to as an undelivered message holding messages - that are not delivered to the destination - in the form of a queue because the destination queue either doesn't exist or is full. Every manager in the network architecture has a dead letter queue that is responsible for holding messages that weren't delivered at the destination and can be stored for later retrieval.

Queue managers, message channel agents (MCAs), and applications are responsible for putting messages that fail to be delivered to DLQ. If one does not define the dead letter queue for each manager, MCAs are unable to put a message and it is left on the transmission queue, resulting in the breakdown of the channel. Moreover, if non-persistent messages cannot be delivered and the dead letter queue is missing on the target system, the messages are discarded.

Implementing Kafka using Python

There are many API libraries that can be used to publish messages to Kafka topics and consume messages from Kafka topics. Python, being one of the most popular languages - and a glue programming language at that - can be used to implement Kafka.

Implementation using Python script can be done with the help of kafka-python which is designed to work similarly to a Java client.

Using kafka-python

Installing Kafka using Python script with ‘pip’ command:

pip install kafka-python

Implementation of Kafka Consumer:

To consume from techexplained-topic
consumer = KafkaConsumer('techexplained-topic',
                        group_id='myGroup', enable_auto_commit=False,
                        bootstrap_servers=['localhost:9092'],
          value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
   print (message.topic)
   print (message.partition)
   print (message.offset)
   print(message.key)

In the above code, the property ‘group_id’ is essential as it specifies which consumer group the consumer is a member of. The property ‘enable_auto_commit’ is set to ‘False’ to ensure that it doesn’t commit offsets automatically.

Implementation of Kafka Producer:

from kafka import KafkaProducer
from kafka.errors import KafkaError
# create a producer. broker is running on localhost
producer = KafkaProducer(retries=5, bootstrap_servers=['localhost:9092'])
# define the on success function 
def on_success(record):
   print(record.topic)
   print(record.partition)
   print(record.offset)
# define the on error callback function
def on_error(excp):
   log.error(excp)
   raise Exception(excp)
# send the message to techexplained-topic
producer.send('techexplained-topic', {'key': 'value'}).add_callback(on_success).add_errback(on_error)
# block until all async messages are sent
producer.flush()

Implementing confluent-kafka using Python client

Confluent has developed a Python client called ‘confluent-kafka-python’ for Apache Kafka. It provides a high-level Producer, Consumer, and AdminClient compatible with all Kafka brokers >= v0.8.

The following are the steps for its implementation:

Python client installation

For Python version greater than 3x, run the following command in cmd:

pip install confluent-kafka

Create a project using a virtual environment

a. Create a new directory anywhere using the following command:

mkdir kafka-python-getting-started && cd kafka-python-getting-started

b. Activate the virtual environment:

virtualenv env

source env/bin/activate

Initialize Kafka Producer

This can be initialized in the following way:

from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': "host1:9092,host2:9092", 'client.id': socket.gethostname()}

producer = Producer(conf)

Asynchronous writes: The ‘produce()’ method is called to initiate a message to Kafka by passing in the message value. Additionally, we can also pass a key, partition, and call back. The produce call will be completed immediately and will not return a value. An exception called 'Kafka exception' will be thrown if the local produce queue is full and the message cannot be added to the queue.

The following code represents the call for the produce() method:

producer.produce(topic, key="key", value="value")

The call back parameter can be passed in the produce() function to receive notification about the messages’ delivery success or failure. The messages are immediately included in the queue by the produce() method for batching compression and transmission to the broker.

def acked (err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))

producer.produce(topic, key="key", value="value", callback=acked)

Wait up to 1 second for events. Callbacks will be invoked during

this method call if the message is acknowledged.

producer.poll(1)

Synchronous writes: The confluent-kafka's Python client provides a method called flash() which is widely used to make synchronous writes. It is not effective as it limits the throughput of the broker round trip time but can be useful in a few cases.

The following code illustrates the implementation of this method:

producer.produce(topic, key="key", value="value")
producer.flush()

Initialize Kafka Consumer

This is initialized as follows:

from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'host1:9092,host2:9092', 'group.id': "foo", 'enable.auto.commit': False, 'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)

Code source

Python implementation for dead letter queue

DLQ implementation in Python.webp

To understand the dead letter queue implementation in Python, we will use the kafka-python library to create Kafka Producer and Consumer Clients and connect to Kafka Cluster.

Let’s discuss the implementation step by step:

Step 1:

It is important to include all the required libraries or packages in the program. After importing the required libraries, we define the bootstrap servers along with the DLQ topic name and primary topic name. This will help create instances of Kafka Producer and Consumer.

The following code illustrates how to define bootstrap servers, DLQ, and primary topic name:

from kafka import KafkaProducer, KafkaConsumer
import json
bootstrap_servers = ['localhost:9093']
primary_topic = 'primary-topic-name-text'
dlq_topic = 'dlq-topic-name-text'

Step 2:

In this step, we will create a Producer for the DLQ topic which will be used to send the malformed messages.

The following code illustrates how to define a Producer with the given DLQ topic:

dlq_producer = KafkaProducer(
   bootstrap_servers=bootstrap_servers,
   value_serializer=lambda x: x.encode('utf-8'),
   acks='all'
)

Step 3:

Here, we will define the consumer() method which plays an important role in consuming the messages from the primary topic.

consumer = KafkaConsumer(
   primary_topic,
   bootstrap_servers=bootstrap_servers,
   auto_offset_reset='latest',
   enable_auto_commit=True,
   value_deserializer=lambda x: x.decode('utf-8')
)

Step 4:

Now, we will define a try and except block which will check if the received message is in JSON format or not. If it is not, the message will be routed to the DLQ topic defined above.

The code below illustrates its implementation:

for msg in consumer:
   print(f'\nReceived:\nPartition: {msg.partition} \tOffset: {msg.offset}\tValue: {msg.value}')

try: data = json.loads(msg.value) print('Data Received:', data)

except: print(f'Value {msg.value} not in JSON format') dlq_producer.send(dlq_topic, value=msg.value) print('Message sent to DLQ Topic')

Code source

Kafka DLQ best practices

Error handling using DLQ in Kafka.webp

Identification and handling of errors is crucial for a reliable and smooth data streaming pipeline. In this section, we will explore the best practices for recognizing and handling errors before they occur using DLQ in Kafka.

1. Reprocess: There are some messages in DLQ in Kafka that have to be reprocessed. One solution is to develop an automatic script. This entails human intervention to edit the message or directly asking the producer to resend the message after returning an error.

2. Drop bad messages (after further analysis): In a data streaming pipeline, there is a possibility for bad messages to appear depending upon the setup. Before excluding them, it is important to examine them through a business process. For example, you can consider an app with a dashboard to visualize error messages.

3. Advanced analytics: Rather than identifying and processing each message in DLQ in Kafka, we can use advanced analytical methods to analyze the incoming data in data streaming pipelines to get real-time insights.

For example, consider an SQL database application that can process incoming messages in a data streaming pipeline, such as the average number of messages produced or any other analytical insights, which can be used to decide on the errors in the DLQ in Kafka applications.

4. Stop the workflow: If we know that bad messages will occur in an application, we can stop the workflow by writing an automation script. The developer can also make the decision to stop the workflow.

5. Ignore: In this method, the developer need not do anything but let the DLQ in Kafka fill up completely. However, it is not a preferred option though it can be useful in a few use cases. For instance, it can be used to monitor the overall behavior of DLQ in Kafka such that the Kafka topic has a retention time. The removal of messages takes place after this retention period.

Here are a few more practices to handle errors in DLQ in Kafka:

1. We can define a business process that can deal with invalid messages. The infrastructure team and the data owners can be notified via alerts about invalid messages. They can be notified that the data was bad and that they need to resend the data from the system of record. Messages can also be ignored during the initial application. It can save time and money and reduces the load on the network.

2. Use a development dashboard with a proper alert system and integration via email for quick notification.

3. Avoid pushing retryable messages to DLQ in Kafka and try retryable messages only.

4. Keep the original messages and store them in DLQ in Kafka along with the following additional information. This will make it easier to debug and reprocess.

a. Message occurred during error

b. Time of the error

c. Name of the application where the error occurred.

5. Identify the number of DLQs in Kafka required since a single DLQ may not be useful in most cases.

In this article, we covered concepts related to Apache Kafka and its implementation in Python. We discussed the concept of DLQ in Kafka wherein undelivered messages are added to the dead letter queue. We also explored the best practices involved in dealing with these undelivered messages. Kafka has many advantages, including its highly scalable nature and high throughput. Owing to these benefits, it is widely adopted by organizations in their software development lifecycle.

Author

  • How to Implement DLQ in Kafka Using Python

    Turing

    Author is a seasoned writer with a reputation for crafting highly engaging, well-researched, and useful content that is widely read by many of today's skilled programmers and developers.

Frequently Asked Questions

It can be implemented with confluent-kafka, PyKafka, and python-kafka.

The key features are low latency, seamless messaging functionality, and high scalability.

Kafka with more than one broker is defined as a Kafka Cluster. It is used without downtime.

View more FAQs
Press

Press

What’s up with Turing? Get the latest news about us here.
Blog

Blog

Know more about remote work. Checkout our blog here.
Contact

Contact

Have any questions? We’d love to hear from you.

Hire remote developers

Tell us the skills you need and we'll find the best developer for you in days, not weeks.