Apache Kafka for Developers #8: Kafka Consumer Auto Offset Reset

Consumer Auto Offset Reset configuration is mainly used in the following scenarios
  • When a new consumer group is created and reads from the topic partition for the first time and consumer instances need to determine where to begin reading from the topic's partitions.
  • When the last committed offset is deleted from the __consumer_offsets topic due to retention policy

When a consumer starts without a previously committed offset, the `auto.offset.reset` setting determines where it should begin reading messages. The possible values are:

earliest

In this setting, the consumer starts reading from the earliest available message in the partition. This is useful when you want to process all messages from the beginning.

This setting is perfect for applications that need to process all historical data, such as data analytics or batch processing jobs. However, It can cause high latency if there's a large number of messages, as the consumer will start from the very beginning.

use below settings to configure Auto Offset Reset
auto.offset.reset: earliest // in Java
fromBeginning: true   // in node.js using kafkajs library

Example: Set up a Kafka consumer with auto.offset.reset to earliest in Node.js using the kafkajs library

// install kafkajs using  'npm install kafkajs'

// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'test-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const run = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic and set fromBeginning to true
    await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                offset: message.offset, key: message.key.toString(), value: message.value.toString()
            });
        },
    });
};

run().catch(console.error);

latest

In this setting, the consumer begins reading from the latest message upon subscribing to the topic partition. This default setting is ideal when you only need to process new messages.

Ideal for real-time applications like monitoring systems or real-time analytics, However, if the consumer is down while processing and committing the first message, it might miss messages produced during its downtime.

use below settings to configure Auto Offset Reset
auto.offset.reset: latest // in Java
fromBeginning: false // in node.js using kafkajs library

Example: Set up a Kafka consumer with auto.offset.reset to latest in Node.js using the kafkajs library


// install kafkajs using  'npm install kafkajs'

// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'test-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const run = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic and set fromBeginning to false
    await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                offset: message.offset, key: message.key.toString(), value: message.value.toString()
            });
        },
    });
};

run().catch(console.error);

none

In this setting, the consumer throws an exception if no previous offset is found for the given consumer group. This is useful when you want to ensure that the consumer only processes messages if there is a previously committed offset.

Ideal for strict processing needs where the consumer should only start with a previously committed offset, However, it may lead to application errors if no offset is found.

Example: Set up a Kafka consumer with auto.offset.reset to none in Node.js using the kafkajs library

// install kafkajs using  'npm install kafkajs'

// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'test-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const run = async () => {
    await consumer.connect();
    await consumer.subscribe({ topic: 'topic-A' });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {

            // throw error if no previous offset found
            if (message.offset === '0') {
                throw new Error('No previous offset found');
            }
            console.log({
                partition,
                offset: message.offset,
                value: message.value.toString(),
            });
        },
    });
};

run().catch(console.error);



Once a consumer reads a message and commits its offset, the "auto.offset.reset" setting is no longer relevant, as the committed offset now serves as the new starting point.