Consumer Auto Offset Reset configuration is mainly used in the following scenarios
When a consumer starts without a previously committed offset, the `auto.offset.reset` setting determines where it should begin reading messages. The possible values are:
earliest
In this setting, the consumer starts reading from the earliest available message in the partition. This is useful when you want to process all messages from the beginning.
This setting is perfect for applications that need to process all historical data, such as data analytics or batch processing jobs. However, It can cause high latency if there's a large number of messages, as the consumer will start from the very beginning.
use below settings to configure Auto Offset Reset
Example: Set up a Kafka consumer with auto.offset.reset to earliest in Node.js using the kafkajs library
latest
In this setting, the consumer begins reading from the latest message upon subscribing to the topic partition. This default setting is ideal when you only need to process new messages.
Ideal for real-time applications like monitoring systems or real-time analytics, However, if the consumer is down while processing and committing the first message, it might miss messages produced during its downtime.
use below settings to configure Auto Offset Reset
In this setting, the consumer throws an exception if no previous offset is found for the given consumer group. This is useful when you want to ensure that the consumer only processes messages if there is a previously committed offset.
Ideal for strict processing needs where the consumer should only start with a previously committed offset, However, it may lead to application errors if no offset is found.
Once a consumer reads a message and commits its offset, the "auto.offset.reset" setting is no longer relevant, as the committed offset now serves as the new starting point.
Happy Coding :)
- When a new consumer group is created and reads from the topic partition for the first time and consumer instances need to determine where to begin reading from the topic's partitions.
- When the last committed offset is deleted from the __consumer_offsets topic due to retention policy
When a consumer starts without a previously committed offset, the `auto.offset.reset` setting determines where it should begin reading messages. The possible values are:
earliest
In this setting, the consumer starts reading from the earliest available message in the partition. This is useful when you want to process all messages from the beginning.
This setting is perfect for applications that need to process all historical data, such as data analytics or batch processing jobs. However, It can cause high latency if there's a large number of messages, as the consumer will start from the very beginning.
use below settings to configure Auto Offset Reset
auto.offset.reset: earliest // in Java
fromBeginning: true // in node.js using kafkajs library
// install kafkajs using 'npm install kafkajs'
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'test-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const run = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic and set fromBeginning to true
await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
offset: message.offset, key: message.key.toString(), value: message.value.toString()
});
},
});
};
run().catch(console.error);
In this setting, the consumer begins reading from the latest message upon subscribing to the topic partition. This default setting is ideal when you only need to process new messages.
Ideal for real-time applications like monitoring systems or real-time analytics, However, if the consumer is down while processing and committing the first message, it might miss messages produced during its downtime.
use below settings to configure Auto Offset Reset
auto.offset.reset: latest // in Java
fromBeginning: false // in node.js using kafkajs library
Example: Set up a Kafka consumer with auto.offset.reset to latest in Node.js using the kafkajs library
none // install kafkajs using 'npm install kafkajs'
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'test-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const run = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic and set fromBeginning to false
await consumer.subscribe({ topic: 'topic-A', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
offset: message.offset, key: message.key.toString(), value: message.value.toString()
});
},
});
};
run().catch(console.error);
In this setting, the consumer throws an exception if no previous offset is found for the given consumer group. This is useful when you want to ensure that the consumer only processes messages if there is a previously committed offset.
Ideal for strict processing needs where the consumer should only start with a previously committed offset, However, it may lead to application errors if no offset is found.
Example: Set up a Kafka consumer with auto.offset.reset to none in Node.js using the kafkajs library
// install kafkajs using 'npm install kafkajs'
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'test-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'topic-A' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// throw error if no previous offset found
if (message.offset === '0') {
throw new Error('No previous offset found');
}
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
};
run().catch(console.error);
Apache Kafka for Developers Journey:
- Apache Kafka for Developers #1: Introduction to Kafka and Comparison with RabbitMQ
- Apache Kafka for Developers #2: Kafka Architecture and Components
- Apache Kafka for Developers #3: Kafka Topic Replication
- Apache Kafka for Developers #4: Kafka Producer and Acknowledgements
- Apache Kafka for Developers #5: Kafka Consumer and Consumer Group
- Apache Kafka for Developers #6: Kafka Consumer Partition Rebalancing
- Apache Kafka for Developers #7: Kafka Consumer Commit Offset
- Apache Kafka for Developers #8: Kafka Consumer Auto Offset Reset