Starting with version 2.2, Kafka has introduced a kafka-delete-records API to purge older data till the specified offset of a topic. This tutorial will show how data can be purged from Kafka topics.
Information needed to purge data from topic./kafka-delete-records.sh --bootstrap-server <hostname:port> --offset-json-file <path_to_config_file>
*************delete_record_config.json***********
{"partitions":
[
{"topic": "kafka_test_topic", "partition": 0, "offset": 4}
],
"version":1
}
*************************************************
*************delete_record_config_partition.json*********** {"partitions": [ {"topic": "kafka_test_topic_withpartition", "partition": 0, "offset": 2}, {"topic": "kafka_test_topic_withpartition", "partition": 1, "offset": 2}, {"topic": "kafka_test_topic_withpartition", "partition": 2, "offset": 3} ], "version":1 } *************************************************
$ cd ~/kafka_2.12-2.2.2/bin
*************delete_record_config.json***********
{"partitions":
[
{"topic": "kafka_test_topic", "partition": 0, "offset": 1}
],
"version":1
}
*************************************************
Lower Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2 Output: kafka_test_topic:0:0 Upper Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1 Output: kafka_test_topic:0:13
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config.json Output: Executing records delete operation Records delete operation completed: partition: kafka_test_topic-0 low_watermark: 1
Lower Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2 Output: kafka_test_topic:0:1 Upper Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1 Output: kafka_test_topic:0:13
*************delete_record_config.json***********
{"partitions":
[
{"topic": "kafka_test_topic", "partition": 0, "offset": 4}
],
"version":1
}
*************************************************
Lower Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2 Output: kafka_test_topic:0:1 Upper Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1 Output: kafka_test_topic:0:13
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config.json Output: Executing records delete operation Records delete operation completed: partition: kafka_test_topic-0 low_watermark: 4
Lower Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2 Output: kafka_test_topic:0:4 Upper Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1 Output: kafka_test_topic:0:13
*************delete_record_config_partition.json*********** {"partitions": [ {"topic": "kafka_test_topic_withpartition", "partition": 0, "offset": 2}, {"topic": "kafka_test_topic_withpartition", "partition": 1, "offset": 2} ], "version":1 } *************************************************
Lower Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -2 Output: kafka_test_topic_withpartition:0:0 kafka_test_topic_withpartition:1:0 Upper Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -1 Output: kafka_test_topic_withpartition:0:4 kafka_test_topic_withpartition:1:3
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config_partition.json Output: Executing records delete operation Records delete operation completed: partition: kafka_test_topic_withpartition-0 low_watermark: 2 partition: kafka_test_topic_withpartition-1 low_watermark: 2
Lower Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -2 Output: kafka_test_topic_withpartition:0:2 kafka_test_topic_withpartition:1:2 Upper Offset: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -1 Output: kafka_test_topic_withpartition:0:4 kafka_test_topic_withpartition:1:3
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config.json Output: Executing records delete operation Records delete operation completed: partition: kafka_test_topic-1 error: org.apache.kafka.common.errors.LeaderNotAvailableException: There is no leader for this topic-partition as we are in the middle of a leadership election.