From 176ea0d0f9b4386cdc0e3b9bab560a11e3d3d516 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Fri, 1 Feb 2019 14:17:52 -0800 Subject: [PATCH] KAFKA-7873; Always seek to beginning in KafkaBasedLog (#6203) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Explicitly seek KafkaBasedLog’s consumer to the beginning of the topic partitions, rather than potentially use committed offsets (which would be unexpected) if group.id is set or rely upon `auto.offset.reset=earliest` if the group.id is null. This should not change existing behavior but should remove some potential issues introduced with KIP-287 if `group.id` is not set in the consumer configurations. Note that even if `group.id` is set, we still always want to consume from the beginning. Reviewers: Jason Gustafson --- .../java/org/apache/kafka/connect/util/KafkaBasedLog.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index ea9b4c621f9..9d77d21767a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -146,6 +146,10 @@ public class KafkaBasedLog { partitions.add(new TopicPartition(partition.topic(), partition.partition())); consumer.assign(partitions); + // Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets + // when a 'group.id' is specified (if offsets happen to have been committed unexpectedly). + consumer.seekToBeginning(partitions); + readToLogEnd(); thread = new WorkThread();