Kafka Consumer Groups

Kafka Consumer Groups



Go to the Kafka bin folder before running any of the command
$ cd ~/kafka_2.11-1.1.0/bin


Defining Consumer Group: Consumer group can be defined by specifying key(group.id)/value(consumer_group_name) pair(group.id=consumer_group_name) using any of the following methods

  1. --consumer-property
    Command:
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=cg_name_1 --topic kafka_test_topic --from-beginning
    
    Output:
    Kafka Message 1   
    Kafka Message 2
    
  2. --consumer.config: By using property file to define configurations for fetching the data. Click here to download sample config file.
    /*    config.properties file content*/
    group.id=cg_name_2
    
    
    Command:
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config ~/files/config.properties --topic kafka_test_topic --from-beginning
    
    Output:
    Kafka Message 1   
    Kafka Message 2
    
    Note: consumer-property will take priority between(if both are defined in same command) —consumer-property & --consumer.config.

Fetching Data using Consumer Group: By default, new consumer group will not fetch older messages that got pushed to the Kafka topic(queue) before defining consumer group.

    → New Consumer Group with --from-beginning will read all messages from starting offset.
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_test_topic --consumer-property group.id=cg_name_3 --from-beginning
    
    Output:
    Kafka Message 1
    Kafka Message 2
    

    → New Consumer Group without --from-beginning will read only new message defined after creating group.
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_test_topic --consumer-property group.id=cg_name_4
    
    Output:
    Processed a total of 0 messages
    

List all the consumer groups: Below command can be used to fetch all the defined consumer groups.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Output:
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

cg_name_4
cg_name_3
cg_name_1
cg_name_2


Consumer groups Metadata: Kafka provide a way to describe 'Consumer Group' and get the metadata information that Kafka internally maintains to store offset for the messages which are already consumed. Some of the information that can be retrieved are listed below.

Reset Current Offset: Kafka provides a way to reset current offset for the consumer group to re-read or skip messages from the Kafka topic. 

    Below command will only show what will be done and it will not actually execute it(dry run) without —execute . This mean it will not reset consumer group. For future versions, need to specify --dry-run explicitly to do dry run.
     ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --topic kafka_test_topic 
    
    Output:
    TOPIC              PARTITION  NEW-OFFSET     
    kafka_test_topic   0          0    
    
    
    =>Example with --dry-run
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --topic kafka_test_topic --dry-run
    
    Output:
    TOPIC              PARTITION  NEW-OFFSET     
    kafka_test_topic   0          4              
    

    Consumer group can be reset to read from beginning.
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --topic kafka_test_topic --execute
    
    Output:
    TOPIC              PARTITION  NEW-OFFSET     
    kafka_test_topic   0          0              
    
    
    =>Checking current offset
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --describe
    
    Output:
    TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID HOST  CLIENT-ID
    kafka_test_topic   0          0               9               9      -           -     -
    

    Consumer group can be reset to read from the latest offset.
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-latest --topic kafka_test_topic --execute
    
    Output: 
    TOPIC              PARTITION  NEW-OFFSET     
    kafka_test_topic   0          9              
    
    
    
    =>Checking current offset
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --describe
    
    Output:
    TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID HOST  CLIENT-ID
    kafka_test_topic   0          9               9               0      -           -     -
    

    --all-topics attribute can be used to reset offsets to the beginning for all the related topics of Consumer group.
     ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --all-topics --execute
    
    Output:
    TOPIC                          PARTITION  NEW-OFFSET     
    kafka_test_topic_withpartition 0          0              
    kafka_test_topic               0          0              
    kafka_test_topic_withpartition 2          0              
    kafka_test_topic_withpartition 1          0              
    

Delete Consumer groups: Kafka provides a way to delete existing consumer groups. Consumer groups can also be deleted to skip all the current unread messages from the Kafka topic. 

    Example to delete single consumer group(cg_name_1).
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group cg_name_1
    
    Output:
    Deletion of requested consumer groups ('cg_name_1') was successful.
    

    Example to delete multiple consumer groups(cg_name_1,cg_name_3) in one command.
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group cg_name_1 --group cg_name_3
    
    Output:
    Deletion of requested consumer groups ('cg_name_3', 'cg_name_1') was successful.
    


Consumer group Members/Clients: User can check which all clients are actively fetching data from Kafka Topic. 

    Example when no client currently active for the consumer group.
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group-test  --members
    
    Output:
    Consumer group 'my-group-test' has no active members.
    

    Example when client is currently active for the consumer group. It will give Consumer-Id, Hostname, Client-Id and number of partitions being accessed by this client.
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group-test  --members
    
    Output:
    CONSUMER-ID                                             HOST             CLIENT-ID          #PARTITIONS     
    kafka-python-2.0.1-fff2260c-3b19-490f-b623-dfd4472147d0 /0:0:0:0:0:0:0:1 kafka-python-2.0.1 2