Consumers are clients that read data from Kafka topics. They subscribe to one or more topics and process the data. Each consumer keeps track of its position in each partition using partition offset.
Kafka consumers follow a pull model. This means that consumers actively request data from Kafka brokers rather than having the brokers push data to them
Consumer Group
A group of consumers work together to consume data from a topic. Each consumer in the group processes data from different partitions, allowing for parallel processing and load balancing.
Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and are part of the same group, each consumer will receive messages from a different subset of the partitions in the topic. Consumer groups must have unique group ids within the cluster
Since there is only one consumer in the group, Consumer 1 will be assigned all three partitions of Topic(test-topic1) . This means Consumer 1 will read messages from Partition 0, Partition 1, and Partition 2.
Set up a Consumer group with three consumers
With this setup, we can create multiple instances of consumers with the same group ID. Each consumer will be part of the same group and will share the load of consuming messages from the topic.
Since there are three partitions and three consumers, each consumer will be assigned exactly one partition. It means Consumer 1 might be assigned Partition 0, Consumer 2 might be assigned Partition 1 and Consumer 3 might be assigned Partition 2In a production environment, it's recommended to use a Kubernetes deployment with multiple replicas to run each Kafka consumer in separate containers. This approach provides scalability and fault tolerance.
Happy Coding :)
Kafka consumers follow a pull model. This means that consumers actively request data from Kafka brokers rather than having the brokers push data to them
Consumer Group
A group of consumers work together to consume data from a topic. Each consumer in the group processes data from different partitions, allowing for parallel processing and load balancing.
Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and are part of the same group, each consumer will receive messages from a different subset of the partitions in the topic. Consumer groups must have unique group ids within the cluster
For additional details about Kafka Consumer, please refer to the following link
Apache Kafka for Developers #5: Kafka Consumer and Consumer Group
Prerequisites:
Set up a Kafka consumer with auto commit enabled
Apache Kafka for Developers #5: Kafka Consumer and Consumer Group
- Set up a Kafka Cluster by following the link Apache Kafka for Developers #10:Setting Up Kafka Locally with Docker
- Set up a Kafka Topic by following the link Apache Kafka for Developers #11: Creating and Managing Kafka Topics
- Produce messages into Kafka Topic by following the link Apache Kafka for Developers #12: Setting Up a Kafka Producer in Node.js using KafkaJS
create a separate listener called EXTERNAL_HOST for the producer app running outside the docker environment
version: '3'
services:
kafka-broker-1:
image: 'bitnami/kafka:3.3.1'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=MY_CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,MY_CONTROLLER://:9094,EXTERNAL_HOST://:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=MY_CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-1:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-1:9092,EXTERNAL_HOST://localhost:29092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_KRAFT_CLUSTER_ID=d8zv92ecQk6ZrAAA35SDbw
ports:
- 29092:29092
install KafkaJs library using
npm install kafkajs
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-consumer-app',
brokers: ['localhost:29092']
});
// Create a new consumer instance
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const ConsumerAutoCommitSingleTopic = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic and set fromBeginning to true
await consumer.subscribe({ topic: 'test-topic1', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
});
},
});
};
// Run the consumer
ConsumerAutoCommitSingleTopic().catch(console.error);
process.on('unhandledRejection', async e => {
try {
console.error(e);
await consumer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
process.on('uncaughtException', async e => {
try {
console.error(e);
await consumer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
const signals =[ 'SIGTERM', 'SIGINT', 'SIGUSR2']
signals.forEach(type => {
process.once(type, async () => {
try {
console.log(`Received signal: ${type}`)
await consumer.disconnect()
} finally {
process.kill(process.pid, type)
}
})
})
modify "scripts" as below in the package.json
"scripts": {
"consumer-start": "node consumer.js"
},
run the consumer using
$ npm run consumer-start
View consumer details using kafka-ui docker container
Set up a Kafka consumer with Multiple Topics
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-consumer-app',
brokers: ['localhost:29092']
});
// Create a new consumer instance
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const ConsumerAutoCommitMultiTopic = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic and set fromBeginning to true
await consumer.subscribe({ topic: ['test-topic1', 'test-topic2'], fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
});
},
});
};
// Run the consumer
ConsumerAutoCommitMultiTopic().catch(console.error);
Set up a Kafka consumer with Manual Commit
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-consumer-app',
brokers: ['localhost:29092']
});
// Create a new consumer instance
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: false
});
const ConsumerManualCommit = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic and set fromBeginning to true
await consumer.subscribe({ topic: 'test-topic1', fromBeginning: true });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
});
// Synchronous commit: Waits for the commit to complete
await consumer.commitOffsets([
{ topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
]);
},
});
};
// Run the consumer
ConsumerManualCommit().catch(console.error);
Set up a Kafka consumer to consume Latest messages
In this setting(fromBeginning: false), the consumer begins reading from the latest message upon subscribing to the topic partition. This setting is ideal when you only need to process new messages.const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-consumer-app',
brokers: ['localhost:29092']
});
// Create a new consumer instance
const consumer = kafka.consumer({
groupId: 'consumer-group-1',
autoCommit: true,
autoCommitInterval: 5000
});
const ConsumerReadMessageFromLatest = async () => {
// Connecting the consumer
await consumer.connect();
// Subscribing to the topic and set fromBeginning to false
await consumer.subscribe({ topic: 'test-topic1', fromBeginning: false });
// Running the consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
});
},
});
};
// Run the consumer
ConsumerReadMessageFromLatest().catch(console.error);
With this setup, we can create multiple instances of consumers with the same group ID. Each consumer will be part of the same group and will share the load of consuming messages from the topic.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-consumer-app',
brokers: ['localhost:29092']
});
const createConsumer = async (groupId, instanceId) => {
const consumer = kafka.consumer({ groupId, autoCommit: true, autoCommitInterval: 5000 });
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic1', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Consumer ${instanceId} - ${message.value.toString()}`);
},
});
return consumer;
};
const multiConsumer = async () => {
const groupId = 'consumer-group-1';
const consumers = [];
// Create 3 consumers
for (let i = 0; i < 3; i++) {
consumers.push(createConsumer(groupId, i));
}
await Promise.all(consumers);
};
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('SIGINT received, shutting down...');
await Promise.all(consumers.map(consumer => consumer.disconnect()));
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('SIGTERM received, shutting down...');
await Promise.all(consumers.map(consumer => consumer.disconnect()));
process.exit(0);
});
multiConsumer().catch(console.error);
You can explore everything about Kubernetes in detail @https://millionvisit.blogspot.com/search/label/kubernetes
Apache Kafka for Developers Journey:
- Apache Kafka for Developers #1: Introduction to Kafka and Comparison with RabbitMQ
- Apache Kafka for Developers #2: Kafka Architecture and Components
- Apache Kafka for Developers #3: Kafka Topic Replication
- Apache Kafka for Developers #4: Kafka Producer and Acknowledgements
- Apache Kafka for Developers #5: Kafka Consumer and Consumer Group
- Apache Kafka for Developers #6: Kafka Consumer Partition Rebalancing
- Apache Kafka for Developers #7: Kafka Consumer Commit Offset
- Apache Kafka for Developers #8: Kafka Consumer Auto Offset Reset
- Apache Kafka for Developers #9: Replacing ZooKeeper with KRaft
- Apache Kafka for Developers #10:Setting Up Kafka Locally with Docker
- Apache Kafka for Developers #11: Creating and Managing Kafka Topics
- Apache Kafka for Developers #12: Setting Up a Kafka Producer in Node.js using KafkaJS
- Apache Kafka for Developers #13: Setting Up a Kafka Consumer in Node.js using KafkaJS
Comments
Post a Comment