Kafka Consumer Groups
- Kafka provides a way to use Kafka messaging system as virtual point to point messaging system instead publisher/Subscriber model.
- Consumer groups need to be specified in order to use kafka topic/topic groups as point to point messaging system.
- Consumer groups can be defined to read message incrementally without specifying offset, Kafka internally take care of last offset.
Go to the Kafka bin folder before running any of the command
$ cd ~/kafka_2.11-1.1.0/bin
➠
Defining Consumer Group: Consumer group can be defined by specifying key(group.id)/value(consumer_group_name) pair(
group.id=consumer_group_name) using any of the following methods
- --consumer-property
Command:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=cg_name_1 --topic kafka_test_topic --from-beginning
Output:
Kafka Message 1
Kafka Message 2
- --consumer.config: By using property file to define configurations for fetching the data. Click here to download sample config file.
/* config.properties file content*/
group.id=cg_name_2
Command:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config ~/files/config.properties --topic kafka_test_topic --from-beginning
Output:
Kafka Message 1
Kafka Message 2
Note: consumer-property will take priority between(if both are defined in same command) —consumer-property & --consumer.config.
➠
Fetching Data using Consumer Group: By default, new consumer group will not fetch older messages that got pushed to the Kafka topic(queue) before defining consumer group.
→ New Consumer Group with --from-beginning will read all messages from starting offset.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_test_topic --consumer-property group.id=cg_name_3 --from-beginning
Output:
Kafka Message 1
Kafka Message 2
→ New Consumer Group without --from-beginning will read only new message defined after creating group.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_test_topic --consumer-property group.id=cg_name_4
Output:
Processed a total of 0 messages
➠
List all the consumer groups: Below command can be used to fetch all the defined consumer groups.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Output:
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
cg_name_4
cg_name_3
cg_name_1
cg_name_2
➠
Consumer groups Metadata: Kafka provide a way to describe 'Consumer Group' and get the metadata information that Kafka internally maintains to store offset for the messages which are already consumed. Some of the information that can be retrieved are listed below.
- Check all the topics associated with 'Consumer Group'.
- Describe consumer group to check current offset.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --describe
Output:
Consumer group 'cg_name_1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka_test_topic 0 7 7 0 - - -
kafka_test_topic_withpartition 0 2 2 0 - - -
kafka_test_topic_withpartition 2 1 1 0 - - -
kafka_test_topic_withpartition 1 2 2 0 - - -
- Describe consumer group to check current lag.
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_test_topic
Kafka Message 8
Kafka Message 9
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --describe
Output:
Consumer group 'cg_name_1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka_test_topic 0 7 9 2 - - -
kafka_test_topic_withpartition 0 2 2 0 - - -
kafka_test_topic_withpartition 2 1 2 1 - - -
kafka_test_topic_withpartition 1 2 3 1 - - -
- Consumer Id will be available only if the consumer is currently fetching data using given consumer group.
➠
Reset Current Offset: Kafka provides a way to reset current offset for the consumer group to re-read or skip messages from the Kafka topic.

→ Below command will only show what will be done and it will not actually execute it(dry run) without —execute . This mean it will not reset consumer group. For future versions, need to specify --dry-run explicitly to do dry run.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --topic kafka_test_topic
Output:
TOPIC PARTITION NEW-OFFSET
kafka_test_topic 0 0
=>Example with --dry-run
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --topic kafka_test_topic --dry-run
Output:
TOPIC PARTITION NEW-OFFSET
kafka_test_topic 0 4
→ Consumer group can be reset to read from beginning.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --topic kafka_test_topic --execute
Output:
TOPIC PARTITION NEW-OFFSET
kafka_test_topic 0 0
=>Checking current offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --describe
Output:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka_test_topic 0 0 9 9 - - -
→ Consumer group can be reset to read from the latest offset.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-latest --topic kafka_test_topic --execute
Output:
TOPIC PARTITION NEW-OFFSET
kafka_test_topic 0 9
=>Checking current offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --describe
Output:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka_test_topic 0 9 9 0 - - -
→ --all-topics attribute can be used to reset offsets to the beginning for all the related topics of Consumer group.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cg_name_1 --reset-offsets --to-earliest --all-topics --execute
Output:
TOPIC PARTITION NEW-OFFSET
kafka_test_topic_withpartition 0 0
kafka_test_topic 0 0
kafka_test_topic_withpartition 2 0
kafka_test_topic_withpartition 1 0
➠
Delete Consumer groups: Kafka provides a way to delete existing consumer groups. Consumer groups can also be deleted to skip all the current unread messages from the Kafka topic.

→ Example to delete single consumer group(cg_name_1).
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group cg_name_1
Output:
Deletion of requested consumer groups ('cg_name_1') was successful.
→ Example to delete multiple consumer groups(cg_name_1,cg_name_3) in one command.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group cg_name_1 --group cg_name_3
Output:
Deletion of requested consumer groups ('cg_name_3', 'cg_name_1') was successful.
➠
Consumer group Members/Clients: User can check which all clients are actively fetching data from Kafka Topic.

→ Example when no client currently active for the consumer group.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group-test --members
Output:
Consumer group 'my-group-test' has no active members.
→ Example when client is currently active for the consumer group. It will give Consumer-Id, Hostname, Client-Id and number of partitions being accessed by this client.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group-test --members
Output:
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
kafka-python-2.0.1-fff2260c-3b19-490f-b623-dfd4472147d0 /0:0:0:0:0:0:0:1 kafka-python-2.0.1 2