Apache Kafka for Developers #11: Creating and Managing Kafka Topics

Topic
A topic is a category where records are stored in the Kafka cluster. Topics are divided into multiple partitions, enabling parallel data processing. 
Kafka topic names have a maximum length of 249 characters includes alphanumeric characters, periods (.), underscores (_), and hyphens (-). Therefore, it's important to keep them concise and meaningful.

An effective Kafka topic name can be constructed using the following components:
  • Domain or Entity : Identifies the origin of the data, such as logs, product, user,hr, sales
  • Data Type or Action: Specifies the nature of the data or event, such as update, click
  • Environment or Region: Indicates the environment or geographic location, for example, "prod" or "dev" for environments, or "us-east" and "eu-west" for regions
  • Version: Indicates version of the topic, such as v1, v2
Examples:

sales.orders.us-east.v1, sales.refunds.dev.v1, user.profile.update.eu-west.v1, user.login.failed, hr.leave.requests, logs.application.error

Partition

A topic divided into multiple partitions for scalability and fault tolerance. Each partition is an ordered, immutable sequence of records that is continuously appended to a commit log. Partitions allow Kafka to scale horizontally by distributing data across multiple servers.

Example
  • If a topic has 3 partitions and there are 3 brokers, each broker will have one partition.
  • If a topic has 3 partitions and there are 5 brokers, the first 3 brokers will each have one partition, while the remaining 2 brokers will not have any partitions for that specific topic.
  • If a topic has 3 partitions and there are 2 brokers, each broker will share more than partition
Determining the right number of partitions for a Kafka topic generally depends on the total throughput for the topic, as well as the throughput per partition for both production and consumption.

Formula to calculate the minimum number of partitions

No.of Partitions = max(t/p, t/c)

t = target throughput for a topic(The total amount of data you want to handle per second)
p = measured throughput on a single production partition(i.e. how much data can be written to a partition per second) 
c = measured throughput on a single consumption partition (i.e. how much data can be read from a partition per second)

Example 1:

Let’s say, t=1000 messages/second, p=100 messages/second, c=200 messages/second

Number of Partitions=max(1000/100,1000/200) = 10

Example 2:

Let’s say, t= 1000 MB/sec, p= 200 MB/sec, c= 150 MB/sec

Number of Partitions=max(1000/200,1000/150) = ~7

Replication Factor

The replication factor is set at the topic level when the topic is created. It specifies how many copies of each partition will be stored across different brokers in a Kafka cluster for fault tolerance and high availability.

Make sure the replication factor is less than or equal to the number of available brokers. otherwise, you'll encounter an "Invalid Replication Factor" error. 

The ideal replication factor for Kafka topics is typically 3. This provides a good balance between redundancy and resource utilization. It can survive the failure of up to two brokers without any data loss.


Create the following docker-compose.yml to sets up a single Kafka broker in the KRaft mode
version: '3'
services:  
  kafka-broker-1:
    image: 'bitnami/kafka:3.3.1'
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=MY_CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,MY_CONTROLLER://:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=MY_CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-1:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-1:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_KRAFT_CLUSTER_ID=d8zv92ecQk6ZrAAA35SDbw
    ports:
      - 9092:9092

Run the below command to build and start the docker container
docker compose -f "docker-compose.yml" up -d --build // run to build and start the container

// View list of docker processes
$ docker ps
CONTAINER ID   IMAGE                           COMMAND                  CREATED       STATUS       PORTS                    
bf76f525a3b8   bitnami/kafka:3.3.1             "/opt/bitnami/script…"   4 hours ago   Up 4 hours   0.0.0.0:9092->9092/tcp


enter docker container to create Kafka topic using CLI
// enter the container
// Syntax: docker exec -it <container_id> <command>
$ docker exec -it bf76f525a3b8 bin/sh


Create Kafka Topic
// create kafka topic

/*
Syntax: kafka-topics.sh --create --topic <topic_name> --partitions <no_of_partitions> --replication-factor <replication_factor> --bootstrap-server <kafka_broker>

--create: flag to create a topic
--topic: name of the topic
--partitions: number of partitions for the topic
--replication-factor: number of replicas for each partition
--bootstrap-server: kafka broker address to connect
*/
$ kafka-topics.sh --create --topic test-topic1 --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

Created topic test-topic1.

// as there is only one broker running, replication factor should be 1. It will throw an error if replication factor is more than the number of brokers
$ kafka-topics.sh --create --topic test-topic2 --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

Error while executing topic command : Unable to replicate the partition 2 time(s): The target replication factor of 2 cannot be reached
because only 1 broker(s) are registered.

Create Kafka Topic With Custom Configuration

$ kafka-topics.sh --create --topic test-topic3 --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092 \
  --config retention.ms=604800000 \
  --config cleanup.policy=compact \
  --config min.insync.replicas=2 \
  --config segment.bytes=1073741824 \
  --config compression.type=gzip \
  --config max.message.bytes=1048576 \
  --config message.timestamp.type=CreateTime

  Explaination of Settings:

    topic: The name of the topic
    partitions: The number of partitions for the topic
    replication-factor: The number of replicas for each partition across different brokers
    bootstrap-server: The Kafka broker address to connect
    retention.ms: Defines how long Kafka retains messages in the topic. This can be set by time or size(default is 7 days)
    cleanup.policy: Determines how old data is cleaned up. Options include delete/compact(default is delete)
    min.insync.replicas: Minimum number of replicas that must acknowledge a write to be considered successful(default is 1)
    segment.bytes: Size of the log segment files Ex:1073741824 (default is 1 GB).
    compression.type: Type of compression for the messages like gzip, snappy, lz4, etc (default is none)
    max.message.bytes: The maximum size of a message that can be sent to the topic(default is 1 MB)
    message.timestamp.type: Specifies whether the timestamp in the message is set by the producer(CreateTime)
                            or the broker(LogAppendTime) (default is CreateTime)

List all Topics

// list all topics
/*
Syntax: kafka-topics.sh --list --bootstrap-server <kafka_broker>

--list: flag to list all topics
--bootstrap-server: kafka broker address to connect
*/
$ kafka-topics.sh --list --bootstrap-server localhost:9092

test-topic1

Describe a Topic

// describe a topic to get details about the a specific topic
/*
Syntax: kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <kafka_broker>

--describe: flag to describe a topic
--topic: name of the topic
--bootstrap-server: kafka broker address to connect
*/
$ kafka-topics.sh --describe --topic test-topic1 --bootstrap-server localhost:9092

Topic: test-topic1      TopicId: K-1z_KKtQfqb5TRSSHIHyA PartitionCount: 3       ReplicationFactor: 1    Configs:
        Topic: test-topic1      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: test-topic1      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: test-topic1      Partition: 2    Leader: 1       Replicas: 1     Isr: 1

Alter a Topic

// alter a topic to increase partitions
/*
Syntax: kafka-topics.sh --alter --topic <topic_name> --partitions <no_of_partitions> --bootstrap-server <kafka_broker>

--alter: flag to alter a topic
--topic: name of the topic
--partitions: number of partitions for the topic
--bootstrap-server: kafka broker address to connect
*/
$ kafka-topics.sh --alter --topic test-topic1 --partitions 5 --bootstrap-server localhost:9092

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
partitions for topic test-topic1 increased to 5.  


Delete a Topic

// delete a topic
/*
Syntax: kafka-topics.sh --delete --topic <topic_name> --bootstrap-server <kafka_broker>

--delete: flag to delete a topic
--topic: name of the topic
--bootstrap-server: kafka broker address to connect
*/
$ kafka-topics.sh --delete --topic test-topic1 --bootstrap-server localhost:9092
Topic test-topic1 is marked for deletion.


Apache Kafka for Developers Journey:

Happy Coding :)

Comments