Cyberithub

17 Useful Apache Kafka Examples on Linux(RedHat/CentOS 7/8)

Advertisements

In this article, I will take you through 17 Useful Apache Kafka Examples on Linux(RedHat/CentOS 7/8). For many decades Organizations are using databases to store objects and entities. In a more general term objects and entities state but there were also a separate class of people who started thinking about Object events rather than Object state. Events can be stored in a Log which can further be managed by Apache Kafka. Logs in other way named as topics in Kafka Architecture.

Apache Kafka is a fast and fault-tolerance distributed messaging system. It was initially developed at LinkedIn but later became a Open Source Project under Kafka.

Advertisements

Some Important Terminologies

Topics : An ordered persistent sequence of Events stored in durable way.

Producer : It is responsible for writing kafka data to subscribed topics.

Advertisements

Consumer: It is responsible for reading Kafka data from subscribed topics. You can find more on Explaining Apache Kafka Architecture in 3 Popular Steps about Consumer.

Kafka Stream:  It is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.

Advertisements

Kafka Broker: It stores and manages messages from producer and allows consumers to read messages from topic.

Partitions : A partition is a basic unit of Topic. Each topic consists of multiple partitions. Each partitions holds messages and can be distributed across system to consume messages from multiple consumers.

Advertisements

17 Useful Apache Kafka Examples on Linux(RedHat/CentOS 7/8)

Apache Kafka Examples

Also Read: Apache Kafka Tutorial for Beginners with 3 Best Examples

Example 1: Check Kafka Version

To check Kafka version, you need to use --version option as shown below. As you can see from below output, current kafka version is 2.4.1.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --version
2.4.1 (Commit:c57222ae8cd7866b)

--version : Display Kafka version.

NOTE:

Please note that I am running all the scripts here with root user. You can use any user with sudo access or with appropriate permissions to run these scripts. You can check How to add User to Sudoers to know more about providing sudo access to User.

 

Example 2. Create a Apache Kafka Topic

In this Apache Kafka Example, you will know how to create a Kafka topic. Here we are creating a topic testTopic1 by using --create option with kafka-topics.sh script using 1 partition and with replication factor 1.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic1
Created topic testTopic1.

--create : Creates a new topic

--zookeeper : The connection string for the zookeeper connection in the form host:port

--replication-factor : The replication factor for each partition of the topic being created.

--partitions : The number of partitions of the topic being created or altered

--topic : A topic or topics to be altered, created, deleted, or listed and described

Example 3. List All Apache Kafka Topics

If you want to check the list of all Apache Kafka topics, then you need to use --list option as shown below. As you can see from below output, 3 topics are currently available in the System.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
sampleTopic
testTopic1

--list :List all available topics.

Example 4: Describe About a Kafka Topic

If you want to see all the information about a Kafka topic then you need to use --describe as shown in below command. Here we checking partition details, replicas detail, replicationfactor and other information about topic testTopic1.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --describe --topic testTopic1 --zookeeper localhost:2181
Topic: testTopic1 PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: testTopic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

 

Example 5: Delete a Topic

If you want to delete a Kafka topic, then you need to use --delete option as shown below. In this apache kafka example, we are trying to delete testTopic1 topic using kafka-topics.sh script.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testTopic1
Topic testTopic1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

--delete : Delete a topic

Example 6: Start Kafka Server

To start Kafka Server you need to run kafka-server-start.sh script with configuration properties passed as an argument as mentioned below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-server-start.sh config/server.properties
[2020-04-03 17:28:46,764] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-04-03 17:28:47,519] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-04-03 17:28:47,520] INFO starting (kafka.server.KafkaServer)
[2020-04-03 17:28:47,521] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2020-04-03 17:28:47,557] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-04-03 17:28:47,566] INFO Client environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.ZooKeeper)

 

Example 7: Stop Kafka Server

If you want to stop Kafka Server then you need to use kafka-server-stop.sh script as shown below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-server-stop.sh

 

Example 8: Check Configuration of All Topics

If you want to check the configurations of all topics, then you need to mention the --entity-type and --describe option with kafka-configs-sh script to check that.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --describe
Configs for topic 'sampleTopic' are
Configs for topic 'testTopic1' are
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

--entity-type : Type of entity(topics/clients/users/brokers/broker-loggers)

--describe : List configs for the given entity.

Example 9: Dump Kafka Logs

Sometimes you may want to debug any Kafka error you are currently facing in your Kafka Cluster. To do that you need to dump kafka log using kafka-run-class.sh script as shown below. This will help you in analyzing various Kafka segments for any possible error.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /tmp/kafka-logs/testTopic1-0/00000000000000000000.log
Dumping /tmp/kafka-logs/testTopic1-0/00000000000000000000.log
Starting offset: 0

--deep-iteration : if set, uses deep instead of shallow iteration

--files : The comma separated list of data and index log files to be dumped

Example 10: Display API Versions of the Cluster Node

If you want to display API Versions of the cluster nodes then you need to use kafka-broker-api-versions.sh script as shown below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
localhost:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11],
ListOffsets(2): 0 to 5 [usable: 5],
Metadata(3): 0 to 9 [usable: 9],
LeaderAndIsr(4): 0 to 4 [usable: 4],
StopReplica(5): 0 to 2 [usable: 2],
UpdateMetadata(6): 0 to 6 [usable: 6],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 6 [usable: 6],
FindCoordinator(10): 0 to 3 [usable: 3],
JoinGroup(11): 0 to 6 [usable: 6],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 4 [usable: 4],
SyncGroup(14): 0 to 4 [usable: 4],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 3 [usable: 3],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 5 [usable: 5],
DeleteTopics(20): 0 to 4 [usable: 4],
DeleteRecords(21): 0 to 1 [usable: 1],
InitProducerId(22): 0 to 2 [usable: 2],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
AddPartitionsToTxn(24): 0 to 1 [usable: 1],
AddOffsetsToTxn(25): 0 to 1 [usable: 1],
EndTxn(26): 0 to 1 [usable: 1],
WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 to 2 [usable: 2],
DescribeAcls(29): 0 to 1 [usable: 1],
CreateAcls(30): 0 to 1 [usable: 1],
DeleteAcls(31): 0 to 1 [usable: 1],
DescribeConfigs(32): 0 to 2 [usable: 2],
AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
DescribeLogDirs(35): 0 to 1 [usable: 1],
SaslAuthenticate(36): 0 to 1 [usable: 1],
CreatePartitions(37): 0 to 1 [usable: 1],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 1 [usable: 1],
ExpireDelegationToken(40): 0 to 1 [usable: 1],
DescribeDelegationToken(41): 0 to 1 [usable: 1],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0]
)

--bootstrap-server : The server to connect to.

Example 11: Increase Partition for a Topic

Sometimes you might want to increase the number of partitions for a topic you can alter the topic by using --alter option with kafka-topics.sh script and increase the number of partitions using --partitions option as shown below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic testTopic1 --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

--alter : Alter the number of partitions,replica assignment, and/or configuration for the topic.

--partitions : The number of partitions for the topic being created or altered

Example 12: Update Topic Config

If you want to update any topic configuration for example testTopic1 configuration in this case then you need to use --add-config option with kafka-configs.sh script as shown below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name testTopic1 --entity-type topics --add-config cleanup.policy=compact
Completed Updating config for entity: topic 'testTopic1'.

--entity-name : Name of entity (topic name/client id/user principal name/broker id)

--entity-type : Type of entity (topics/clients/users/brokers/broker-loggers)

--add-config : Key Value pairs of configs to add.

Example 13. Leader Election

If you want to balance the topics using a json file then you can use kafka-leader-election.sh script and pass the json file as an argument as specified below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-leader-election.sh --path-to-json-file elect-leader.json --election-type preferred --bootstrap-server localhost:9092
Valid replica already elected for partitions

--path-to-json-file :The JSON file with the list of partition for which leader elections should be performed.

--election-type :Type of election to attempt. Possible values are "preferred" for preferred leader election or "unclean" for unclean leader election.

You can find elect-leader.json file below.

[root@localhost kafka_2.13-2.4.1]# vi elect-leader.json
{ "partitions":
[
{ "topic": "testTopic3", "partition": 1 },
{ "topic": "testTopic4", "partition": 2 }
]
}

 

Example 14: Push Messages through a File

If you want to push messages from a file then you can use kafka-console-producer.sh script to take the messages from a file say pushMessage.log and specify the topic on which you want to push using --topic option and push the messages.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic4 < pushMessage.log
>>

 

Example 15: Set Message Retention Time in a Topic

If you want to set retention time of messages in a topic then you can alter the topics configuration by using kafka-configs.sh script and pass the retention time value in retention.ms parameter as shown below to add this configuration. Please note that here time is in milliseconds value.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name testTopic4 --add-config retention.ms=800
Completed Updating config for entity: topic 'testTopic4'.

 

Example 16: Get the Earliest Offset in a Topic

To get the earliest offset in a topic you need to use GetOffsetShell with kafka-run-class.sh script as mentioned below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic3 --time -2
testTopic3:0:0
testTopic3:1:0
testTopic3:2:0

--time :  <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >

--offsets : number of offsets returned (default: 1)

Example 17: Check Other Options with --help

If you want to check all options that can be used with kafka-topics.sh script then you need to use --help option as shown below.

[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --help
This tool helps to create, delete, describe, or change a topic.
Option                         Description
------                        -----------
--alter                  Alter the number of partitions,replica assignment, and/or configuration for the topic.
--at-min-isr-partitions  if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.
--bootstrap-server       <String: server to REQUIRED: The Kafka server to connect connect to> to. In case of providing this, a direct Zookeeper connection won't be required.
--command-config         <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.

 

 

Kafka Replication Tools

Kafka Admin

Kafka Stream-Apache Kafka Example

Leave a Comment