Apache Kafka for Developers #12: Setting Up a Kafka Producer in Node.js using KafkaJS

Producers are clients that publish messages to Kafka topics, distributing them across various partitions. They send data to the broker, which then stores it in the corresponding partition of the topic.

Each message or record that a producer sends includes a Key (optional), Value, Header (optional), and Timestamp.

For additional details about Kafka Producer, please refer to the following link
Apache Kafka for Developers #4: Kafka Producer and Acknowledgements

Prerequisites:
create a separate listener called EXTERNAL_HOST for the producer app running outside the docker environment
version: '3'
services:  
  kafka-broker-1:
    image: 'bitnami/kafka:3.3.1'
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=MY_CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,MY_CONTROLLER://:9094,EXTERNAL_HOST://:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=MY_CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-1:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-1:9092,EXTERNAL_HOST://localhost:29092
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_KRAFT_CLUSTER_ID=d8zv92ecQk6ZrAAA35SDbw
    ports:
      - 29092:29092

install KafkaJs library using
npm install kafkajs

Publish messages to Kafka using a Message Key

Message key determines the partition to which a message is sent, ensuring that all messages with the same key are routed to the same partition. This helps maintain the order of related messages and allows for efficient processing.

const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-test-app',
    brokers: ['localhost:29092']
});

// Create Kafka producer instance
const producer = kafka.producer();

const ProducerWithKey = async () => {
    try {
        // Connect to the Kafka broker
        await producer.connect();

        // Send a message to the topic 'test-topic1'
        await producer.send({
            topic: 'test-topic1',
            messages: [{ key: 'key1', value: 'first message' }],
        });
    } catch (error) {
        console.error('Error occurred while sending the message', error);
    } finally {
        // Disconnect the producer
        await producer.disconnect();
    }
};

// Run the producer
ProducerWithKey().catch(console.error);

modify "scripts" as below in the package.json
  "scripts": {
    "producer-start": "node producer.js"
  },

run the producer using
$ npm run producer-start

View messages using kafka-ui docker container




Publish messages to Kafka without Message Key

When publishing messages to Kafka without a key, they are distributed across partitions in a round-robin manner. This ensures an even load distribution but does not maintain the order of messages across partitions.

const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-test-app',
    brokers: ['localhost:29092']
});

// Create Kafka producer instance
const producer = kafka.producer();


const ProducerWithoutKey = async () => {
    try {
        // Connect to the Kafka broker
        await producer.connect();

        // Send a message to the topic 'test-topic1'
        await producer.send({
            topic: 'test-topic1',
            messages: [{ value: 'first message without key' }],
        });
    } catch (error) {
        console.error('Error occurred while sending the message', error);
    } finally {
        // Disconnect the producer
        await producer.disconnect();
    }
};

// Run the producer
ProducerWithoutKey().catch(console.error);


Publish messages to Kafka with Specific Partition

When publishing messages to a specific partition in Kafka, you can specify the partition number directly. This ensures that all messages are sent to the chosen partition, maintaining their order and enabling focused processing.

const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-test-app',
    brokers: ['localhost:29092']
});

// Create Kafka producer instance
const producer = kafka.producer();


const ProducerToSpecificPartition = async () => {
    try {
        // Connect to the Kafka broker
        await producer.connect();

        // Send a message to the topic 'test-topic1' with partition 0
        await producer.send({
            topic: 'test-topic1',
            messages: [{ value: 'first message partition 0', partition: 0 }],
        });
    } catch (error) {
        console.error('Error occurred while sending the message', error);
    } finally {
        // Disconnect the producer
        await producer.disconnect();
    }
};

// Run the producer
ProducerToSpecificPartition().catch(console.error);

Publish messages to Multiple Topics

When publishing messages to multiple Kafka topics, you can either use a single producer to send messages to different topics or create separate producers for each topic.

const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-test-app',
    brokers: ['localhost:29092']
});

// Create Kafka producer instance
const producer = kafka.producer();

const ProducerToMultipleTopics = async () => {
    try {
        // Create an array of messages to send to multiple topics
        const messages = [
            { topic: 'test-topic1', messages: [{ value: 'message1' }, { value: 'message2' }] },
            { topic: 'test-topic2', messages: [{ value: 'message3' }, { value: 'message4' }] }
        ];

        // Connect to the Kafka broker
        await producer.connect();

        // Send the messages to the topics
        await producer.sendBatch({
            topicMessages: messages
        });
    } catch (error) {
        console.error('Error occurred while sending the message', error);
    } finally {
        // Disconnect the producer
        await producer.disconnect();
    }
};

// Run the producer
ProducerToMultipleTopics().catch(console.error);

Publish messages with Headers

When publishing messages to Kafka with headers, you can include additional metadata as key-value pairs.

const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-test-app',
    brokers: ['localhost:29092']
});

// Create Kafka producer instance
const producer = kafka.producer();

const ProducerWithHeader = async () => {
    try {
        // Connect to the Kafka broker
        await producer.connect();

        // Send a message to the topic 'test-topic1' with headers
        await producer.send({
            topic: 'test-topic1',
            messages: [{
                value: 'first message partition 0',
                partition: 0,
                headers: {
                    'correlation-id': '1234',
                    'client-id': 'my-test-app'
                }
            }]

        });
    } catch (error) {
        console.error('Error occurred while sending the message', error);
    } finally {
        // Disconnect the producer
        await producer.disconnect();
    }
};

// Run the producer
ProducerWithHeader().catch(console.error);

Publish messages with Acknowledgements

Acknowledgements (acks) in Kafka are a mechanism to ensure that messages are reliably stored in the Kafka topic partition.

Kafka producers only write data to the leader partition in the broker.

Kafka producers must also set the acknowledgment level (acks) to indicate whether a message needs to be written to a minimum number of replicas before it is considered successfully written.

Control the number of required acks.

-1 = all insync replicas(min.insync.replicas property) must acknowledge (default)
 0 = no acknowledgments
 1 = only waits for the leader to acknowledge

const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-test-app',
    brokers: ['localhost:29092']
});

// Create Kafka producer instance
const producer = kafka.producer();

const ProducerWithAcks = async () => {
    try {
        // Connect to the Kafka broker
        await producer.connect();

        // Send a message to the topic 'test-topic1' with acks
        await producer.send({
            topic: 'test-topic1',
            messages: [{ key: 'key1', value: 'first message' }],
            acks: 1
        });
    } catch (error) {
        console.error('Error occurred while sending the message', error);
    } finally {
        // Disconnect the producer
        await producer.disconnect();
    }
};


// Run the producer
ProducerWithAcks().catch(console.error);

Apache Kafka for Developers Journey:

Happy Coding :)

Comments