Apache Kafka for Developers #7: Kafka Consumer Commit Offset

Committing offsets is important to ensure that a consumer can restart from the correct position after a failure or restart. Without committing offsets, the consumer might reprocess messages, causing duplicates, or miss messages, leading to data loss.

Kafka brokers use an internal topic called __consumer_offsets to keep track of the last successfully processed messages for each consumer group. It contains only the most recent offset metadata for each consumer group, ensuring efficient storage and quick access.

The __consumer_offsets topic stores data as key-value pairs. The key includes details about the consumer group, topic, and partition, while the value contains the offset and metadata.


Kafka provides several ways to commit consumer offsets:

Auto Commit

Kafka automatically commits the offsets of messages consumed by a consumer at regular intervals. By default, this interval is set to 5000 milliseconds (5 seconds). 

To enable auto commit, configure consumer with the following properties:
enable.auto.commit: true
auto.commit.interval.ms: 5000  // default 5000 milliseconds i.e. 5 seconds

It is simple and easy to implement. However, If the consumer crashes before the next auto commit interval, messages that were processed but not yet committed may be reprocessed again, leading to potential duplicates.

Example: Set up a Kafka consumer with auto commit enabled in Node.js using the kafkajs library.

// install kafkajs using  'npm install kafkajs'

// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'test-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const run = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic
    await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                offset: message.offset, key: message.key.toString(), value: message.value.toString()
            });
        },
    });
};

run().catch(console.error);


Manual Commit

It allows the consumer to manually commits offsets after processing messages. This provides greater control over when offsets are committed, reducing the risk of data loss or duplicate message processing. 

To enable manual commit, configure consumer with the following properties:
enable.auto.commit: false

Manual Commit can be done either synchronously or asynchronously.

Synchronous Manual Commit

In this process, the consumer waits for the commit to complete before processing the next batch of messages, ensuring offsets are securely stored in Kafka before proceeding.

This can be implemented using commitSync() method in Java or waiting for the commitOffsets promise to resolve in kafkajs.

Example: Set up a Kafka consumer with synchronous manual commit in Node.js using the kafkajs library.

// install kafkajs using  'npm install kafkajs'


// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'test-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: false
});

const run = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic
    await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                offset: message.offset, key: message.key.toString(), value: message.value.toString()
            });

            // Synchronous commit: Waits for the commit to complete            
            await consumer.commitOffsets([
                { topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
            ]);
        },
    });
};

run().catch(console.error);


Asynchronous Manual Commit

In this process, the consumer doesn't wait for the commit to finish to process the next batch of messages. This improves performance and throughput by reducing the latency associated with waiting for the commit operation to finish. However, it slightly increases the risk of data loss compared to synchronous commits.

This can be implemented using commitAsync() method in Java or do not await for the commitOffsets promise to resolve in kafkajs.

Example: Set up a Kafka consumer with asynchronous manual commit in Node.js using the kafkajs library.

// install kafkajs using  'npm install kafkajs'


// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'test-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: false
});

const run = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic
    await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                offset: message.offset, key: message.key.toString(), value: message.value.toString()
            });

            // Asynchronous commit: Do not await for the commit to complete          
            consumer.commitOffsets([
                { topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
            ]);
        },
    });
};

run().catch(console.error);