Committing offsets is important to ensure that a consumer can restart from the correct position after a failure or restart. Without committing offsets, the consumer might reprocess messages, causing duplicates, or miss messages, leading to data loss.
Kafka provides several ways to commit consumer offsets:
Auto Commit
Kafka automatically commits the offsets of messages consumed by a consumer at regular intervals. By default, this interval is set to 5000 milliseconds (5 seconds).
It is simple and easy to implement. However, If the consumer crashes before the next auto commit interval, messages that were processed but not yet committed may be reprocessed again, leading to potential duplicates.
Example: Set up a Kafka consumer with auto commit enabled in Node.js using the kafkajs library.
Manual Commit
It allows the consumer to manually commits offsets after processing messages. This provides greater control over when offsets are committed, reducing the risk of data loss or duplicate message processing.
Asynchronous Manual Commit
In this process, the consumer doesn't wait for the commit to finish to process the next batch of messages. This improves performance and throughput by reducing the latency associated with waiting for the commit operation to finish. However, it slightly increases the risk of data loss compared to synchronous commits.
Happy Coding :)
Kafka brokers use an internal topic called __consumer_offsets to keep track of the last successfully processed messages for each consumer group. It contains only the most recent offset metadata for each consumer group, ensuring efficient storage and quick access.
The __consumer_offsets topic stores data as key-value pairs. The key includes details about the consumer group, topic, and partition, while the value contains the offset and metadata.
Kafka provides several ways to commit consumer offsets:
Auto Commit
Kafka automatically commits the offsets of messages consumed by a consumer at regular intervals. By default, this interval is set to 5000 milliseconds (5 seconds).
To enable auto commit, configure consumer with the following properties:
enable.auto.commit: true
auto.commit.interval.ms: 5000 // default 5000 milliseconds i.e. 5 seconds
It is simple and easy to implement. However, If the consumer crashes before the next auto commit interval, messages that were processed but not yet committed may be reprocessed again, leading to potential duplicates.
// install kafkajs using 'npm install kafkajs'
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'test-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const run = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic
await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
offset: message.offset, key: message.key.toString(), value: message.value.toString()
});
},
});
};
run().catch(console.error);
Manual Commit
It allows the consumer to manually commits offsets after processing messages. This provides greater control over when offsets are committed, reducing the risk of data loss or duplicate message processing.
To enable manual commit, configure consumer with the following properties:
Manual Commit can be done either synchronously or asynchronously.
Synchronous Manual Commit
In this process, the consumer waits for the commit to complete before processing the next batch of messages, ensuring offsets are securely stored in Kafka before proceeding.
This can be implemented using commitSync() method in Java or waiting for the commitOffsets promise to resolve in kafkajs.
enable.auto.commit: false
Synchronous Manual Commit
In this process, the consumer waits for the commit to complete before processing the next batch of messages, ensuring offsets are securely stored in Kafka before proceeding.
This can be implemented using commitSync() method in Java or waiting for the commitOffsets promise to resolve in kafkajs.
Example: Set up a Kafka consumer with synchronous manual commit in Node.js using the kafkajs library.
// install kafkajs using 'npm install kafkajs'
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'test-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: false
});
const run = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic
await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
offset: message.offset, key: message.key.toString(), value: message.value.toString()
});
// Synchronous commit: Waits for the commit to complete
await consumer.commitOffsets([
{ topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
]);
},
});
};
run().catch(console.error);
In this process, the consumer doesn't wait for the commit to finish to process the next batch of messages. This improves performance and throughput by reducing the latency associated with waiting for the commit operation to finish. However, it slightly increases the risk of data loss compared to synchronous commits.
This can be implemented using commitAsync() method in Java or do not await for the commitOffsets promise to resolve in kafkajs.
Example: Set up a Kafka consumer with asynchronous manual commit in Node.js using the kafkajs library.
// install kafkajs using 'npm install kafkajs'
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'test-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: false
});
const run = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic
await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
offset: message.offset, key: message.key.toString(), value: message.value.toString()
});
// Asynchronous commit: Do not await for the commit to complete
consumer.commitOffsets([
{ topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
]);
},
});
};
run().catch(console.error);
Apache Kafka for Developers Journey:
- Apache Kafka for Developers #1: Introduction to Kafka and Comparison with RabbitMQ
- Apache Kafka for Developers #2: Kafka Architecture and Components
- Apache Kafka for Developers #3: Kafka Topic Replication
- Apache Kafka for Developers #4: Kafka Producer and Acknowledgements
- Apache Kafka for Developers #5: Kafka Consumer and Consumer Group
- Apache Kafka for Developers #6: Kafka Consumer Partition Rebalancing
- Apache Kafka for Developers #7: Kafka Consumer Commit Offset