Browse Source

KAFKA-7873; Always seek to beginning in KafkaBasedLog (#6203)

Explicitly seek KafkaBasedLog’s consumer to the beginning of the topic partitions, rather than potentially use committed offsets (which would be unexpected) if group.id is set or rely upon `auto.offset.reset=earliest` if the group.id is null.

This should not change existing behavior but should remove some potential issues introduced with KIP-287 if `group.id` is not set in the consumer configurations. Note that even if `group.id` is set, we still always want to consume from the beginning.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6225/head
Randall Hauch 6 years ago committed by Jason Gustafson
parent
commit
176ea0d0f9
  1. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

4
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

@ -146,6 +146,10 @@ public class KafkaBasedLog<K, V> { @@ -146,6 +146,10 @@ public class KafkaBasedLog<K, V> {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
consumer.assign(partitions);
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
// when a 'group.id' is specified (if offsets happen to have been committed unexpectedly).
consumer.seekToBeginning(partitions);
readToLogEnd();
thread = new WorkThread();

Loading…
Cancel
Save