Resetting Consumer Group Offsets

This process allows consumers to reprocess messages, skip past certain messages, or start processing from a different point in the topic.

  1. Reprocessing Data: If there's a need to reprocess old data, such as after fixing a bug in the consumer logic.
  2. Skipping Faulty Data: If certain messages caused errors and you need to skip them.
  3. Starting Fresh: If the consumer group is repurposed to handle a new set of messages.
  4. Recovery from Issues: After recovering from an outage or data corruption.

Kafka Consumer Group Command

AdminClient API

The AdminClient API allows you to manage and alter consumer group offsets programmatically. This is a more flexible and powerful way to reset offsets, especially for applications that need to automate this process.

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.OffsetAndMetadata;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class OffsetResetExample {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        try (AdminClient adminClient = AdminClient.create(props)) {
            String consumerGroupId = "my-group";
            TopicPartition topicPartition = new TopicPartition("my-topic", 0);

            // Example: Reset to earliest
            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(
                Collections.singletonMap(topicPartition, OffsetSpec.earliest()));
            long earliestOffset = listOffsetsResult.partitionResult(topicPartition).get().offset();

            // Create the offset map to update
            Map<TopicPartition, OffsetAndMetadata> offsetsToReset = Collections.singletonMap(
                topicPartition, new OffsetAndMetadata(earliestOffset));

            // Reset offsets
            AlterConsumerGroupOffsetsResult resetOffsetsResult =
                adminClient.alterConsumerGroupOffsets(consumerGroupId, offsetsToReset);
            resetOffsetsResult.all().get ();
            System.out.println("Offsets reset successfully.");
        }
    }
}

References

Flashcards

Using the Kafka Consumer Group CLI Command, how would you reset the consumer group offsets to the earliest?:: kafka-consumer-groups.sh --bootstrap-server <broker> --group <group_id> --topic <topic> --reset-offsets --to-earliest --execute

Using AdminClient API, how would you reset the consumer group offsets to the earliest?:: adminClient.alterConsumerGroupOffsets(consumerGroupId, offsetsToReset);