Leverage Turing Intelligence capabilities to integrate AI into your operations, enhance automation, and optimize cloud migration for scalable impact.
Advance foundation model research and improve LLM reasoning, coding, and multimodal capabilities with Turing AGI Advancement.
Access a global network of elite AI professionals through Turing Jobs—vetted experts ready to accelerate your AI initiatives.
Apache Kafka is a valuable open-source software that is used to store, read, and evaluate streaming data in real-time. It is widely adopted by businesses owing to its scalable and high-throughput infrastructure which can be used to store, analyze, and reprocess streaming data. In this article, we will cover dead letter queue or DLQ in Kafka, the architecture of Kafka, and a lot more.
Kafka is among the most popular open-source frameworks capable of reading, storing, and analyzing streaming data in real-time. Since it is open-source, it is free of cost and has many developers who contribute towards adding new features and providing support to new users.
Streaming data can be defined as the data produced by many data sources with the data record sent simultaneously. The job of the streaming platform is crucial as it has to handle the incoming data simultaneously and process it sequentially.
The following are the functions of Kafka:
Kafka is used to build real-time streaming data pipelines and streaming applications. A data pipeline is responsible for processing and moving data from one system to another, while a streaming pipeline can be defined as an application that consumes the stream of data received from data sources. However, Kafka is also used as a message broker, helping communicate and mediate between two applications.
Kafka is a combination of two messaging models: queuing and publish-subscribe. It provides the key benefits of each of these models to consumers. Queuing model provides the data processing to be distributed across consumer instances to make it highly scalable. However, the problem with traditional queues is that they don’t allow multi-subscribers. The publish-subscriber is a multi-subscriber model but it can’t be used to distribute work across multiple worker processes as every message goes to every subscriber.
To combine these two solutions, Kafka uses a partition log model. A log is the ordered sequence of records. They are divided into partitions corresponding to different subscribers. Thus, Kafka allows scalability and multiple applications to fetch data from the data streams and work independently at their own pace.
Image source: AWS
Image source: AWS
There are three major advantages of using Kafka:
1. Scalability: Kafka uses a log partition model that distributes the data across multiple servers which makes it highly scalable compared to a single server.
2. Fast: It is extremely fast as it decouples the data streams which results in lower latency.
3. Durability: It distributes partitions across many servers and all the data is written to disks. Thus, it protects against system failure which makes it durable and fault tolerant.
The dead letter queue is defined as a queue to which messages are sent if they fail to reach the correct destination. It is also referred to as an undelivered message holding messages - that are not delivered to the destination - in the form of a queue because the destination queue either doesn't exist or is full. Every manager in the network architecture has a dead letter queue that is responsible for holding messages that weren't delivered at the destination and can be stored for later retrieval.
Queue managers, message channel agents (MCAs), and applications are responsible for putting messages that fail to be delivered to DLQ. If one does not define the dead letter queue for each manager, MCAs are unable to put a message and it is left on the transmission queue, resulting in the breakdown of the channel. Moreover, if non-persistent messages cannot be delivered and the dead letter queue is missing on the target system, the messages are discarded.
There are many API libraries that can be used to publish messages to Kafka topics and consume messages from Kafka topics. Python, being one of the most popular languages - and a glue programming language at that - can be used to implement Kafka.
Implementation using Python script can be done with the help of kafka-python which is designed to work similarly to a Java client.
Installing Kafka using Python script with ‘pip’ command:
pip install kafka-python
Implementation of Kafka Consumer:
To consume from techexplained-topic consumer = KafkaConsumer('techexplained-topic', group_id='myGroup', enable_auto_commit=False, bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii'))) for message in consumer: print (message.topic) print (message.partition) print (message.offset) print(message.key)
In the above code, the property ‘group_id’ is essential as it specifies which consumer group the consumer is a member of. The property ‘enable_auto_commit’ is set to ‘False’ to ensure that it doesn’t commit offsets automatically.
Implementation of Kafka Producer:
from kafka import KafkaProducer from kafka.errors import KafkaError # create a producer. broker is running on localhost producer = KafkaProducer(retries=5, bootstrap_servers=['localhost:9092']) # define the on success function def on_success(record): print(record.topic) print(record.partition) print(record.offset) # define the on error callback function def on_error(excp): log.error(excp) raise Exception(excp) # send the message to techexplained-topic producer.send('techexplained-topic', {'key': 'value'}).add_callback(on_success).add_errback(on_error) # block until all async messages are sent producer.flush()
Confluent has developed a Python client called ‘confluent-kafka-python’ for Apache Kafka. It provides a high-level Producer, Consumer, and AdminClient compatible with all Kafka brokers >= v0.8.
The following are the steps for its implementation:
For Python version greater than 3x, run the following command in cmd:
pip install confluent-kafka
a. Create a new directory anywhere using the following command:
mkdir kafka-python-getting-started && cd kafka-python-getting-started
b. Activate the virtual environment:
virtualenv envsource env/bin/activate
This can be initialized in the following way:
from confluent_kafka import Producer import socketconf = {'bootstrap.servers': "host1:9092,host2:9092", 'client.id': socket.gethostname()}
producer = Producer(conf)
Asynchronous writes: The ‘produce()’ method is called to initiate a message to Kafka by passing in the message value. Additionally, we can also pass a key, partition, and call back. The produce call will be completed immediately and will not return a value. An exception called 'Kafka exception' will be thrown if the local produce queue is full and the message cannot be added to the queue.
The following code represents the call for the produce() method:
producer.produce(topic, key="key", value="value")
The call back parameter can be passed in the produce() function to receive notification about the messages’ delivery success or failure. The messages are immediately included in the queue by the produce() method for batching compression and transmission to the broker.
def acked (err, msg): if err is not None: print("Failed to deliver message: %s: %s" % (str(msg), str(err))) else: print("Message produced: %s" % (str(msg)))producer.produce(topic, key="key", value="value", callback=acked)
Wait up to 1 second for events. Callbacks will be invoked during
this method call if the message is acknowledged.
producer.poll(1)
Synchronous writes: The confluent-kafka's Python client provides a method called flash() which is widely used to make synchronous writes. It is not effective as it limits the throughput of the broker round trip time but can be useful in a few cases.
The following code illustrates the implementation of this method:
producer.produce(topic, key="key", value="value") producer.flush()
This is initialized as follows:
from confluent_kafka import Consumerconf = {'bootstrap.servers': 'host1:9092,host2:9092', 'group.id': "foo", 'enable.auto.commit': False, 'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
To understand the dead letter queue implementation in Python, we will use the kafka-python library to create Kafka Producer and Consumer Clients and connect to Kafka Cluster.
Let’s discuss the implementation step by step:
Step 1:
It is important to include all the required libraries or packages in the program. After importing the required libraries, we define the bootstrap servers along with the DLQ topic name and primary topic name. This will help create instances of Kafka Producer and Consumer.
The following code illustrates how to define bootstrap servers, DLQ, and primary topic name:
from kafka import KafkaProducer, KafkaConsumer import json bootstrap_servers = ['localhost:9093'] primary_topic = 'primary-topic-name-text' dlq_topic = 'dlq-topic-name-text'
Step 2:
In this step, we will create a Producer for the DLQ topic which will be used to send the malformed messages.
The following code illustrates how to define a Producer with the given DLQ topic:
dlq_producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda x: x.encode('utf-8'), acks='all' )
Step 3:
Here, we will define the consumer() method which plays an important role in consuming the messages from the primary topic.
consumer = KafkaConsumer( primary_topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='latest', enable_auto_commit=True, value_deserializer=lambda x: x.decode('utf-8') )
Step 4:
Now, we will define a try and except block which will check if the received message is in JSON format or not. If it is not, the message will be routed to the DLQ topic defined above.
The code below illustrates its implementation:
for msg in consumer: print(f'\nReceived:\nPartition: {msg.partition} \tOffset: {msg.offset}\tValue: {msg.value}')try: data = json.loads(msg.value) print('Data Received:', data)
except: print(f'Value {msg.value} not in JSON format') dlq_producer.send(dlq_topic, value=msg.value) print('Message sent to DLQ Topic')
Identification and handling of errors is crucial for a reliable and smooth data streaming pipeline. In this section, we will explore the best practices for recognizing and handling errors before they occur using DLQ in Kafka.
1. Reprocess: There are some messages in DLQ in Kafka that have to be reprocessed. One solution is to develop an automatic script. This entails human intervention to edit the message or directly asking the producer to resend the message after returning an error.
2. Drop bad messages (after further analysis): In a data streaming pipeline, there is a possibility for bad messages to appear depending upon the setup. Before excluding them, it is important to examine them through a business process. For example, you can consider an app with a dashboard to visualize error messages.
3. Advanced analytics: Rather than identifying and processing each message in DLQ in Kafka, we can use advanced analytical methods to analyze the incoming data in data streaming pipelines to get real-time insights.
For example, consider an SQL database application that can process incoming messages in a data streaming pipeline, such as the average number of messages produced or any other analytical insights, which can be used to decide on the errors in the DLQ in Kafka applications.
4. Stop the workflow: If we know that bad messages will occur in an application, we can stop the workflow by writing an automation script. The developer can also make the decision to stop the workflow.
5. Ignore: In this method, the developer need not do anything but let the DLQ in Kafka fill up completely. However, it is not a preferred option though it can be useful in a few use cases. For instance, it can be used to monitor the overall behavior of DLQ in Kafka such that the Kafka topic has a retention time. The removal of messages takes place after this retention period.
Here are a few more practices to handle errors in DLQ in Kafka:
1. We can define a business process that can deal with invalid messages. The infrastructure team and the data owners can be notified via alerts about invalid messages. They can be notified that the data was bad and that they need to resend the data from the system of record. Messages can also be ignored during the initial application. It can save time and money and reduces the load on the network.
2. Use a development dashboard with a proper alert system and integration via email for quick notification.
3. Avoid pushing retryable messages to DLQ in Kafka and try retryable messages only.
4. Keep the original messages and store them in DLQ in Kafka along with the following additional information. This will make it easier to debug and reprocess.
a. Message occurred during error
b. Time of the error
c. Name of the application where the error occurred.
5. Identify the number of DLQs in Kafka required since a single DLQ may not be useful in most cases.
In this article, we covered concepts related to Apache Kafka and its implementation in Python. We discussed the concept of DLQ in Kafka wherein undelivered messages are added to the dead letter queue. We also explored the best practices involved in dealing with these undelivered messages. Kafka has many advantages, including its highly scalable nature and high throughput. Owing to these benefits, it is widely adopted by organizations in their software development lifecycle.
Author is a seasoned writer with a reputation for crafting highly engaging, well-researched, and useful content that is widely read by many of today's skilled programmers and developers.