Kafka Topic Operations
Go to the Kafka bin folder before running any of the command
$ cd ~/kafka_2.11-1.1.0/bin
➠ Creating Kafka Topic
- Creating a kafka topic with a single partition & single replication factor.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_test_topic
Output:
Created topic "kafka_test_topic".
- Try to create topic in single broker server with multiple replication factor, command will fail with error : replication factor: 2 larger than available brokers.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic kafka_test_topic_withpartition
Output:
Error while executing topic command replication factor: 2 larger than available brokers: 1
kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:50)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
- Creating a kafka topic with a multiple partitions.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka_test_topic_withpartition
Output:
Created topic "kafka_test_topic_withpartition".
➠ Describing Kafka Topic
- Describing Kafka topic (Checking defined property of topic).
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_test_topic
Output:
Topic: kafka_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:kafka_test_topic PartitionCount:1 ReplicationFactor:1 Configs:
- Describing Kafka topic (Checking defined property of topic).
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_test_topic_withpartition
Output:
Topic:kafka_test_topic_withpartition PartitionCount:3 ReplicationFactor:1 Configs:
Topic: kafka_test_topic_withpartition Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka_test_topic_withpartition Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka_test_topic_withpartition Partition: 2 Leader: 0 Replicas: 0 Isr: 0
➠ All Topics: List down all the topics available in the kafka cluster.
./kafka-topics.sh --list --zookeeper localhost:2181
Output:
kafka_test_topic
kafka_test_topic_withpartition
➠ Altering Kafka Topic: Change the current defined parameters/configuration for the topic.
- Adding more partiton to the existing Topic.
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic kafka_test_topic --partitions 2
Output:
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
--Describing again
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_test_topic
Output:
Topic:kafka_test_topic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: kafka_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka_test_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
- Changing the configuration parameters
- Older Versions: Change in configuration can be done using 'kafka-topics.sh' script. Increasing the data/message retention period for Kafka.
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic kafka_test_topic --config retention.ms=800000000
Output:
Updated config for topic "kafka_test_topic".
- Newer Versions: Change in configuration can be done using 'kafka-configs.sh' script. Increasing the data/message retention period for Kafka.
./kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name kafka_test_topic --entity-type topics --add-config retention.ms=800000001
Output:
Updated config for topic: "kafka_test_topic".
- Valid configs for entity_type 'topics' that can be changed with the new version.
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
index.interval.bytes
max.message.bytes
min.cleanable.dirty.ratio
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
➠ Deleting Kafka Topic: Dropping existing kafka topic from the Kafka server.
$ ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic kafka_test_topic
Output:
Topic kafka_test_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.