Producers are clients that publish messages to Kafka topics, distributing them across various partitions. They send data to the broker, which then stores it in the corresponding partition of the topic.
Each message or record that a producer sends includes a Key (optional), Value, Header (optional), and Timestamp.
For additional details about Kafka Producer, please refer to the following link
Apache Kafka for Developers #4: Kafka Producer and Acknowledgements
Prerequisites:
Publish messages to Kafka using a Message Key
Publish messages to Kafka without Message Key
When publishing messages to Kafka without a key, they are distributed across partitions in a round-robin manner. This ensures an even load distribution but does not maintain the order of messages across partitions.
Publish messages to Kafka with Specific Partition
When publishing messages to a specific partition in Kafka, you can specify the partition number directly. This ensures that all messages are sent to the chosen partition, maintaining their order and enabling focused processing.
Publish messages to Multiple Topics
Publish messages with Headers
When publishing messages to Kafka with headers, you can include additional metadata as key-value pairs.
Happy Coding :)
Each message or record that a producer sends includes a Key (optional), Value, Header (optional), and Timestamp.
For additional details about Kafka Producer, please refer to the following link
Apache Kafka for Developers #4: Kafka Producer and Acknowledgements
- Set up a Kafka Cluster by following the link Apache Kafka for Developers #10:Setting Up Kafka Locally with Docker
- Set up a Kafka Topic by following the link Apache Kafka for Developers #11: Creating and Managing Kafka Topics
create a separate listener called EXTERNAL_HOST for the producer app running outside the docker environment
version: '3'
services:
kafka-broker-1:
image: 'bitnami/kafka:3.3.1'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=MY_CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,MY_CONTROLLER://:9094,EXTERNAL_HOST://:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=MY_CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-1:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-1:9092,EXTERNAL_HOST://localhost:29092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_KRAFT_CLUSTER_ID=d8zv92ecQk6ZrAAA35SDbw
ports:
- 29092:29092
install KafkaJs library using
npm install kafkajs
Message key determines the partition to which a message is sent, ensuring that all messages with the same key are routed to the same partition. This helps maintain the order of related messages and allows for efficient processing.
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-test-app',
brokers: ['localhost:29092']
});
// Create Kafka producer instance
const producer = kafka.producer();
const ProducerWithKey = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
// Send a message to the topic 'test-topic1'
await producer.send({
topic: 'test-topic1',
messages: [{ key: 'key1', value: 'first message' }],
});
} catch (error) {
console.error('Error occurred while sending the message', error);
} finally {
// Disconnect the producer
await producer.disconnect();
}
};
// Run the producer
ProducerWithKey().catch(console.error);
modify "scripts" as below in the package.json
"scripts": {
"producer-start": "node producer.js"
},
run the producer using
$ npm run producer-start
View messages using kafka-ui docker container
Publish messages to Kafka without Message Key
When publishing messages to Kafka without a key, they are distributed across partitions in a round-robin manner. This ensures an even load distribution but does not maintain the order of messages across partitions.
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-test-app',
brokers: ['localhost:29092']
});
// Create Kafka producer instance
const producer = kafka.producer();
const ProducerWithoutKey = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
// Send a message to the topic 'test-topic1'
await producer.send({
topic: 'test-topic1',
messages: [{ value: 'first message without key' }],
});
} catch (error) {
console.error('Error occurred while sending the message', error);
} finally {
// Disconnect the producer
await producer.disconnect();
}
};
// Run the producer
ProducerWithoutKey().catch(console.error);
When publishing messages to a specific partition in Kafka, you can specify the partition number directly. This ensures that all messages are sent to the chosen partition, maintaining their order and enabling focused processing.
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-test-app',
brokers: ['localhost:29092']
});
// Create Kafka producer instance
const producer = kafka.producer();
const ProducerToSpecificPartition = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
// Send a message to the topic 'test-topic1' with partition 0
await producer.send({
topic: 'test-topic1',
messages: [{ value: 'first message partition 0', partition: 0 }],
});
} catch (error) {
console.error('Error occurred while sending the message', error);
} finally {
// Disconnect the producer
await producer.disconnect();
}
};
// Run the producer
ProducerToSpecificPartition().catch(console.error);
Publish messages to Multiple Topics
When publishing messages to multiple Kafka topics, you can either use a single producer to send messages to different topics or create separate producers for each topic.
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-test-app',
brokers: ['localhost:29092']
});
// Create Kafka producer instance
const producer = kafka.producer();
const ProducerToMultipleTopics = async () => {
try {
// Create an array of messages to send to multiple topics
const messages = [
{ topic: 'test-topic1', messages: [{ value: 'message1' }, { value: 'message2' }] },
{ topic: 'test-topic2', messages: [{ value: 'message3' }, { value: 'message4' }] }
];
// Connect to the Kafka broker
await producer.connect();
// Send the messages to the topics
await producer.sendBatch({
topicMessages: messages
});
} catch (error) {
console.error('Error occurred while sending the message', error);
} finally {
// Disconnect the producer
await producer.disconnect();
}
};
// Run the producer
ProducerToMultipleTopics().catch(console.error);
When publishing messages to Kafka with headers, you can include additional metadata as key-value pairs.
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-test-app',
brokers: ['localhost:29092']
});
// Create Kafka producer instance
const producer = kafka.producer();
const ProducerWithHeader = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
// Send a message to the topic 'test-topic1' with headers
await producer.send({
topic: 'test-topic1',
messages: [{
value: 'first message partition 0',
partition: 0,
headers: {
'correlation-id': '1234',
'client-id': 'my-test-app'
}
}]
});
} catch (error) {
console.error('Error occurred while sending the message', error);
} finally {
// Disconnect the producer
await producer.disconnect();
}
};
// Run the producer
ProducerWithHeader().catch(console.error);
Publish messages with Acknowledgements
Acknowledgements (acks) in Kafka are a mechanism to ensure that messages are reliably stored in the Kafka topic partition.
Kafka producers only write data to the leader partition in the broker.
Kafka producers must also set the acknowledgment level (acks) to indicate whether a message needs to be written to a minimum number of replicas before it is considered successfully written.
Control the number of required acks.
-1 = all insync replicas(min.insync.replicas property) must acknowledge (default)
0 = no acknowledgments
1 = only waits for the leader to acknowledge
Acknowledgements (acks) in Kafka are a mechanism to ensure that messages are reliably stored in the Kafka topic partition.
Kafka producers only write data to the leader partition in the broker.
Kafka producers must also set the acknowledgment level (acks) to indicate whether a message needs to be written to a minimum number of replicas before it is considered successfully written.
Control the number of required acks.
-1 = all insync replicas(min.insync.replicas property) must acknowledge (default)
0 = no acknowledgments
1 = only waits for the leader to acknowledge
const { Kafka } = require('kafkajs');
// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
clientId: 'my-test-app',
brokers: ['localhost:29092']
});
// Create Kafka producer instance
const producer = kafka.producer();
const ProducerWithAcks = async () => {
try {
// Connect to the Kafka broker
await producer.connect();
// Send a message to the topic 'test-topic1' with acks
await producer.send({
topic: 'test-topic1',
messages: [{ key: 'key1', value: 'first message' }],
acks: 1
});
} catch (error) {
console.error('Error occurred while sending the message', error);
} finally {
// Disconnect the producer
await producer.disconnect();
}
};
// Run the producer
ProducerWithAcks().catch(console.error);
Apache Kafka for Developers Journey:
- Apache Kafka for Developers #1: Introduction to Kafka and Comparison with RabbitMQ
- Apache Kafka for Developers #2: Kafka Architecture and Components
- Apache Kafka for Developers #3: Kafka Topic Replication
- Apache Kafka for Developers #4: Kafka Producer and Acknowledgements
- Apache Kafka for Developers #5: Kafka Consumer and Consumer Group
- Apache Kafka for Developers #6: Kafka Consumer Partition Rebalancing
- Apache Kafka for Developers #7: Kafka Consumer Commit Offset
- Apache Kafka for Developers #8: Kafka Consumer Auto Offset Reset
- Apache Kafka for Developers #9: Replacing ZooKeeper with KRaft
- Apache Kafka for Developers #10:Setting Up Kafka Locally with Docker
- Apache Kafka for Developers #11: Creating and Managing Kafka Topics
- Apache Kafka for Developers #12: Setting Up a Kafka Producer in Node.js using KafkaJS
Comments
Post a Comment