diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8bd6d1aec1d..c700cadabe5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -544,38 +544,55 @@ public class StreamThread extends Thread { try { records = consumer.poll(pollTimeMs); - } catch (NoOffsetForPartitionException ex) { - TopicPartition partition = ex.partition(); + } catch (final InvalidOffsetException e) { + resetInvalidOffsets(e); + } + + return records; + } + + private void resetInvalidOffsets(final InvalidOffsetException e) { + final Set partitions = e.partitions(); + final Set loggedTopics = new HashSet<>(); + final Set seekToBeginning = new HashSet<>(); + final Set seekToEnd = new HashSet<>(); + + for (final TopicPartition partition : partitions) { if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { - log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic())); - consumer.seekToBeginning(ex.partitions()); + addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics); } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { - consumer.seekToEnd(ex.partitions()); - log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic())); + addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics); } else { - if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { setState(State.PENDING_SHUTDOWN); - String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + + final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; - throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex); + throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e); } if (originalReset.equals("earliest")) { - consumer.seekToBeginning(ex.partitions()); + addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics); } else if (originalReset.equals("latest")) { - consumer.seekToEnd(ex.partitions()); + addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); } - log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset)); } - } - if (rebalanceException != null) - throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); + if (!seekToBeginning.isEmpty()) { + consumer.seekToBeginning(seekToBeginning); + } + if (!seekToEnd.isEmpty()) { + consumer.seekToEnd(seekToEnd); + } + } - return records; + private void addToResetList(final TopicPartition partition, final Set partitions, final String logMessage, final String resetPolicy, final Set loggedTopics) { + final String topic = partition.topic(); + if (loggedTopics.add(topic)) { + log.info(String.format(logMessage, getName(), topic, resetPolicy)); + } + partitions.add(partition); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java index 3028b6b2d72..35942257d5b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -19,12 +19,16 @@ package org.apache.kafka.streams.integration; import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -43,7 +47,9 @@ import org.junit.experimental.categories.Category; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; @@ -56,35 +62,71 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { private static final int NUM_BROKERS = 1; private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; + private static final String OUTPUT_TOPIC_0 = "outputTopic_0"; + private static final String OUTPUT_TOPIC_1 = "outputTopic_1"; + private static final String OUTPUT_TOPIC_2 = "outputTopic_2"; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private final MockTime mockTime = CLUSTER.time; - private static final String TOPIC_1 = "topic-1"; - private static final String TOPIC_2 = "topic-2"; - private static final String TOPIC_A = "topic-A"; - private static final String TOPIC_C = "topic-C"; - private static final String TOPIC_Y = "topic-Y"; - private static final String TOPIC_Z = "topic-Z"; + private static final String TOPIC_1_0 = "topic-1_0"; + private static final String TOPIC_2_0 = "topic-2_0"; + private static final String TOPIC_A_0 = "topic-A_0"; + private static final String TOPIC_C_0 = "topic-C_0"; + private static final String TOPIC_Y_0 = "topic-Y_0"; + private static final String TOPIC_Z_0 = "topic-Z_0"; + private static final String TOPIC_1_1 = "topic-1_1"; + private static final String TOPIC_2_1 = "topic-2_1"; + private static final String TOPIC_A_1 = "topic-A_1"; + private static final String TOPIC_C_1 = "topic-C_1"; + private static final String TOPIC_Y_1 = "topic-Y_1"; + private static final String TOPIC_Z_1 = "topic-Z_1"; + private static final String TOPIC_1_2 = "topic-1_2"; + private static final String TOPIC_2_2 = "topic-2_2"; + private static final String TOPIC_A_2 = "topic-A_2"; + private static final String TOPIC_C_2 = "topic-C_2"; + private static final String TOPIC_Y_2 = "topic-Y_2"; + private static final String TOPIC_Z_2 = "topic-Z_2"; private static final String NOOP = "noop"; private final Serde stringSerde = Serdes.String(); private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); private Properties streamsConfiguration; + private final String topic1TestMessage = "topic-1 test"; + private final String topic2TestMessage = "topic-2 test"; + private final String topicATestMessage = "topic-A test"; + private final String topicCTestMessage = "topic-C test"; + private final String topicYTestMessage = "topic-Y test"; + private final String topicZTestMessage = "topic-Z test"; + @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(TOPIC_1); - CLUSTER.createTopic(TOPIC_2); - CLUSTER.createTopic(TOPIC_A); - CLUSTER.createTopic(TOPIC_C); - CLUSTER.createTopic(TOPIC_Y); - CLUSTER.createTopic(TOPIC_Z); + CLUSTER.createTopic(TOPIC_1_0); + CLUSTER.createTopic(TOPIC_2_0); + CLUSTER.createTopic(TOPIC_A_0); + CLUSTER.createTopic(TOPIC_C_0); + CLUSTER.createTopic(TOPIC_Y_0); + CLUSTER.createTopic(TOPIC_Z_0); + CLUSTER.createTopic(TOPIC_1_1); + CLUSTER.createTopic(TOPIC_2_1); + CLUSTER.createTopic(TOPIC_A_1); + CLUSTER.createTopic(TOPIC_C_1); + CLUSTER.createTopic(TOPIC_Y_1); + CLUSTER.createTopic(TOPIC_Z_1); + CLUSTER.createTopic(TOPIC_1_2); + CLUSTER.createTopic(TOPIC_2_2); + CLUSTER.createTopic(TOPIC_A_2); + CLUSTER.createTopic(TOPIC_C_2); + CLUSTER.createTopic(TOPIC_Y_2); + CLUSTER.createTopic(TOPIC_Z_2); CLUSTER.createTopic(NOOP); CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); - + CLUSTER.createTopic(OUTPUT_TOPIC_0); + CLUSTER.createTopic(OUTPUT_TOPIC_1); + CLUSTER.createTopic(OUTPUT_TOPIC_2); } @Before @@ -105,41 +147,64 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { } @Test - public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception { + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception { + streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); + + final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage); + shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, OUTPUT_TOPIC_0, expectedReceivedValues); + } + + @Test + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception { + final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); + shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues); + } + + @Test + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception { + commitInvalidOffsets(); + + final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); + shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, OUTPUT_TOPIC_2, expectedReceivedValues); + } + + private void shouldOnlyReadForEarliest( + final String topicSuffix, + final String topic1, + final String topic2, + final String topicA, + final String topicC, + final String topicY, + final String topicZ, + final String outputTopic, + final List expectedReceivedValues) throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]")); - final KStream namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix)); + final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix)); + final KStream namedTopicsStream = builder.stream(topicY, topicZ); - pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + pattern1Stream.to(stringSerde, stringSerde, outputTopic); + pattern2Stream.to(stringSerde, stringSerde, outputTopic); + namedTopicsStream.to(stringSerde, stringSerde, outputTopic); final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - final String topic1TestMessage = "topic-1 test"; - final String topic2TestMessage = "topic-2 test"; - final String topicATestMessage = "topic-A test"; - final String topicCTestMessage = "topic-C test"; - final String topicYTestMessage = "topic-Y test"; - final String topicZTestMessage = "topic-Z test"; - - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList(topicATestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topic1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topic2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicA, Collections.singletonList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicC, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicY, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicZ, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); - final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4); - final List actualValues = new ArrayList<>(4); + final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size()); + final List actualValues = new ArrayList<>(expectedReceivedValues.size()); for (final KeyValue receivedKeyValue : receivedKeyValues) { actualValues.add(receivedKeyValue.value); @@ -149,35 +214,50 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { Collections.sort(actualValues); Collections.sort(expectedReceivedValues); assertThat(actualValues, equalTo(expectedReceivedValues)); - } + private void commitInvalidOffsets() { + final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), + StringDeserializer.class, + StringDeserializer.class)); + + final Map invalidOffsets = new HashMap<>(); + invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null)); + + consumer.commitSync(invalidOffsets); + + consumer.close(); + } @Test(expected = TopologyBuilderException.class) public void shouldThrowExceptionOverlappingPattern() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); //NOTE this would realistically get caught when building topology, the test is for completeness - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]")); - final KStream namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1")); + builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1")); + builder.stream(TOPIC_Y_1, TOPIC_Z_1); builder.earliestResetTopicsPattern(); - } @Test(expected = TopologyBuilderException.class) public void shouldThrowExceptionOverlappingTopic() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); //NOTE this would realistically get caught when building topology, the test is for completeness - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d]")); - final KStream namedTopicsStream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A, TOPIC_Z); + builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1")); + builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1")); + builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1); builder.latestResetTopicsPattern(); - } - @Test public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception { Properties props = new Properties();