Apache Kafka for Developers #13: Setting Up a Kafka Consumer in Node.js using KafkaJS

Consumers are clients that read data from Kafka topics. They subscribe to one or more topics and process the data. Each consumer keeps track of its position in each partition using partition offset.

Kafka consumers follow a pull model. This means that consumers actively request data from Kafka brokers rather than having the brokers push data to them

Consumer Group

A group of consumers work together to consume data from a topic. Each consumer in the group processes data from different partitions, allowing for parallel processing and load balancing.

Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and are part of the same group, each consumer will receive messages from a different subset of the partitions in the topic. Consumer groups must have unique group ids within the cluster

For additional details about Kafka Consumer, please refer to the following link
Apache Kafka for Developers #5: Kafka Consumer and Consumer Group

Prerequisites:
create a separate listener called EXTERNAL_HOST for the producer app running outside the docker environment
version: '3'
services:  
  kafka-broker-1:
    image: 'bitnami/kafka:3.3.1'
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=MY_CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,MY_CONTROLLER://:9094,EXTERNAL_HOST://:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=MY_CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-1:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-1:9092,EXTERNAL_HOST://localhost:29092
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_KRAFT_CLUSTER_ID=d8zv92ecQk6ZrAAA35SDbw
    ports:
      - 29092:29092

install KafkaJs library using
npm install kafkajs

Set up a Kafka consumer with auto commit enabled
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-consumer-app',
    brokers: ['localhost:29092']
});

// Create a new consumer instance
const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const ConsumerAutoCommitSingleTopic = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic and set fromBeginning to true
    await consumer.subscribe({ topic: 'test-topic1', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
            });
        },
    });
};

// Run the consumer
ConsumerAutoCommitSingleTopic().catch(console.error);

process.on('unhandledRejection', async e => {
    try {
        console.error(e);
        await consumer.disconnect()
        process.exit(0)
    } catch (_) {
        process.exit(1)
    }
})
process.on('uncaughtException', async e => {
    try {
        console.error(e);
        await consumer.disconnect()
        process.exit(0)
    } catch (_) {
        process.exit(1)
    }
})

const signals =[ 'SIGTERM', 'SIGINT', 'SIGUSR2']

signals.forEach(type => {
    process.once(type, async () => {
        try {
            console.log(`Received signal: ${type}`)
            await consumer.disconnect()
        } finally {
            process.kill(process.pid, type)
        }
    })
})

modify "scripts" as below in the package.json
  "scripts": {
    "consumer-start": "node consumer.js"
  },

run the consumer using
$ npm run consumer-start

Since there is only one consumer in the group, Consumer 1 will be assigned all three partitions of Topic(test-topic1) . This means Consumer 1 will read messages from Partition 0, Partition 1, and Partition 2.

View consumer details using kafka-ui docker container

Set up a Kafka consumer with Multiple Topics
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-consumer-app',
    brokers: ['localhost:29092']
});

// Create a new consumer instance
const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const ConsumerAutoCommitMultiTopic = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic and set fromBeginning to true
    await consumer.subscribe({ topic: ['test-topic1', 'test-topic2'], fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
            });
        },
    });
};

// Run the consumer
ConsumerAutoCommitMultiTopic().catch(console.error);

Set up a Kafka consumer with Manual Commit
// Create a consumer that reads from the beginning of the topic
const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-consumer-app',
    brokers: ['localhost:29092']
});

// Create a new consumer instance
const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: false
});

const ConsumerManualCommit = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic and set fromBeginning to true
    await consumer.subscribe({ topic: 'test-topic1', fromBeginning: true });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
            });

            // Synchronous commit: Waits for the commit to complete          
            await consumer.commitOffsets([
                { topic, partition, offset: (parseInt(message.offset, 10) + 1).toString() }
            ]);
        },
    });
};

// Run the consumer
ConsumerManualCommit().catch(console.error);


Set up a Kafka consumer to consume Latest messages
In this setting(fromBeginning: false), the consumer begins reading from the latest message upon subscribing to the topic partition. This setting is ideal when you only need to process new messages.
const { Kafka } = require('kafkajs');

// Initialize a new Kafka instance with the client ID and broker address
const kafka = new Kafka({
    clientId: 'my-consumer-app',
    brokers: ['localhost:29092']
});

// Create a new consumer instance
const consumer = kafka.consumer({
    groupId: 'consumer-group-1',
    autoCommit: true,
    autoCommitInterval: 5000
});

const ConsumerReadMessageFromLatest = async () => {
    // Connecting the consumer
    await consumer.connect();

    // Subscribing to the topic and set fromBeginning to false
    await consumer.subscribe({ topic: 'test-topic1', fromBeginning: false });

    // Running the consumer
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                topic, partition, offset: message.offset, key: (message.key || '').toString(), value: message.value.toString()
            });
        },
    });
};

// Run the consumer
ConsumerReadMessageFromLatest().catch(console.error);

Set up a Consumer group with three consumers
With this setup, we can create multiple instances of consumers with the same group ID. Each consumer will be part of the same group and will share the load of consuming messages from the topic.
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'my-consumer-app',
    brokers: ['localhost:29092']
});

const createConsumer = async (groupId, instanceId) => {
    const consumer = kafka.consumer({ groupId, autoCommit: true, autoCommitInterval: 5000 });

    await consumer.connect();
    await consumer.subscribe({ topic: 'test-topic1', fromBeginning: true });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log(`Consumer ${instanceId} - ${message.value.toString()}`);
        },
    });

    return consumer;
};

const multiConsumer = async () => {
    const groupId = 'consumer-group-1';
    const consumers = [];

    // Create 3 consumers
    for (let i = 0; i < 3; i++) {
        consumers.push(createConsumer(groupId, i));
    }
    await Promise.all(consumers);
};

// Graceful shutdown
process.on('SIGINT', async () => {
    console.log('SIGINT received, shutting down...');
    await Promise.all(consumers.map(consumer => consumer.disconnect()));
    process.exit(0);
});

process.on('SIGTERM', async () => {
    console.log('SIGTERM received, shutting down...');
    await Promise.all(consumers.map(consumer => consumer.disconnect()));
    process.exit(0);
});


multiConsumer().catch(console.error);
Since there are three partitions and three consumers, each consumer will be assigned exactly one partition. It means Consumer 1 might be assigned Partition 0, Consumer 2 might be assigned Partition 1 and Consumer 3 might be assigned Partition 2

In a production environment, it's recommended to use a Kubernetes deployment with multiple replicas to run each Kafka consumer in separate containers. This approach provides scalability and fault tolerance.

You can explore everything about Kubernetes in detail @https://millionvisit.blogspot.com/search/label/kubernetes


Apache Kafka for Developers Journey:

Happy Coding :)

Comments