diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index ea9b4c621f9..9d77d21767a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -146,6 +146,10 @@ public class KafkaBasedLog { partitions.add(new TopicPartition(partition.topic(), partition.partition())); consumer.assign(partitions); + // Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets + // when a 'group.id' is specified (if offsets happen to have been committed unexpectedly). + consumer.seekToBeginning(partitions); + readToLogEnd(); thread = new WorkThread();