Purging data from Kafka Topic

Kafka Purge Operation

Starting with version 2.2, Kafka has introduced a kafka-delete-records API to purge older data till the specified offset of a topic. This tutorial will show how data can be purged from Kafka topics.

Information needed to purge data from topic

Syntax:
./kafka-delete-records.sh --bootstrap-server <hostname:port> --offset-json-file <path_to_config_file>


➠ Configuration detail with topic, offset and partition need to be stored in JSON file. Click here to download sample config file.


➠ Go to the Kafka bin folder before running any of the command.
$ cd ~/kafka_2.12-2.2.2/bin


➠ Purging examples for both single partition & multi-partition topics.
  1. Purging single partition topic(Example 1)
    • Configuration file for this example
      *************delete_record_config.json***********
      {"partitions":                        
        [
        {"topic": "kafka_test_topic", "partition": 0, "offset": 1}
         ],
       "version":1
      }  
      *************************************************
      
    • Current minimum & maximum offset for the topic.
      Lower Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2
      
      Output:
      kafka_test_topic:0:0
      
      
      
      Upper Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1
      
      Output:
      kafka_test_topic:0:13
      
    • Executing delete purge API to purge older data having offset 1 and less.
      ./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config.json
      
      Output:
      Executing records delete operation
      Records delete operation completed:
      partition: kafka_test_topic-0   low_watermark: 1
      
    • If you see the lower offset, it is now starting with offset 1 instead of Zero (as before executing purge API).
      Lower Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2
      
      Output:
      kafka_test_topic:0:1
      
      
      
      Upper Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1
      
      Output:
      kafka_test_topic:0:13
      

  2. Purging single partition topic(Example 2)
    • Configuration file for this example
      *************delete_record_config.json***********
      {"partitions":                        
        [
        {"topic": "kafka_test_topic", "partition": 0, "offset": 4}
         ],
       "version":1
      }  
      *************************************************
      
    • Current minimum & maximum offset for the topic.
      Lower Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2
      
      Output:
      kafka_test_topic:0:1
      
      
      
      Upper Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1
      
      Output:
      kafka_test_topic:0:13
      
    • Executing delete purge API to purge older data having offset 4 and less.
      ./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config.json
      
      Output:
      Executing records delete operation
      Records delete operation completed:
      partition: kafka_test_topic-0   low_watermark: 4
      
    • If you see the lower offset, it is now starting with offset 4 instead of One (as before executing purge API).
      Lower Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -2
      
      Output:
      kafka_test_topic:0:4
      
      
      
      Upper Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic --time -1
      
      Output:
      kafka_test_topic:0:13
      

  3. Purging multiple partition topic
    • Configuration file for this example
      *************delete_record_config_partition.json***********
      {"partitions":                        
        [
        {"topic": "kafka_test_topic_withpartition", "partition": 0, "offset": 2},
        {"topic": "kafka_test_topic_withpartition", "partition": 1, "offset": 2}
        ],
       "version":1
      } 
      *************************************************
      
    • Current minimum & maximum offset for the topic.
      Lower Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -2
      
      Output:
      kafka_test_topic_withpartition:0:0
      kafka_test_topic_withpartition:1:0
      
      
      
      Upper Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -1
      
      Output:
      kafka_test_topic_withpartition:0:4
      kafka_test_topic_withpartition:1:3
      
    • Executing delete purge API to purge older data having offset 2 and less for both the partitions.
      ./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config_partition.json
      
      Output:
      Executing records delete operation
      Records delete operation completed:
      partition: kafka_test_topic_withpartition-0 low_watermark: 2
      partition: kafka_test_topic_withpartition-1 low_watermark: 2
      
    • If you see the lower offset, it is now starting with offset 2 instead of Zero(as before executing purge API) for both the partitions.
      Lower Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -2
      
      Output:
      kafka_test_topic_withpartition:0:2
      kafka_test_topic_withpartition:1:2
      
      
      
      Upper Offset:
      ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka_test_topic_withpartition --time -1
      
      Output:
      kafka_test_topic_withpartition:0:4
      kafka_test_topic_withpartition:1:3
      

➠ If users try to purge data for non-existent partition for the topic then it will fail as shown in below example.
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ~/files/delete_record_config.json

Output:
Executing records delete operation
Records delete operation completed:

partition: kafka_test_topic-1   error: org.apache.kafka.common.errors.LeaderNotAvailableException: There is no leader for this topic-partition as we are in the middle of a leadership election.