Browse Source

KAFKA-5003; StreamThread should catch InvalidTopicException

We should catch `InvalidTopicException` and not just
`NoOffsetForPartitionException`. Also, we need to step through
all partitions that might be affected and reset those.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2747 from mjsax/minor-fix-reset
pull/2809/merge
Matthias J. Sax 8 years ago committed by Ismael Juma
parent
commit
afeadbef50
  1. 51
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 172
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java

51
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@ -544,38 +544,55 @@ public class StreamThread extends Thread { @@ -544,38 +544,55 @@ public class StreamThread extends Thread {
try {
records = consumer.poll(pollTimeMs);
} catch (NoOffsetForPartitionException ex) {
TopicPartition partition = ex.partition();
} catch (final InvalidOffsetException e) {
resetInvalidOffsets(e);
}
return records;
}
private void resetInvalidOffsets(final InvalidOffsetException e) {
final Set<TopicPartition> partitions = e.partitions();
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
for (final TopicPartition partition : partitions) {
if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
consumer.seekToBeginning(ex.partitions());
addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics);
} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
consumer.seekToEnd(ex.partitions());
log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics);
} else {
if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
setState(State.PENDING_SHUTDOWN);
String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e);
}
if (originalReset.equals("earliest")) {
consumer.seekToBeginning(ex.partitions());
addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics);
} else if (originalReset.equals("latest")) {
consumer.seekToEnd(ex.partitions());
addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics);
}
log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
}
}
if (rebalanceException != null)
throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
if (!seekToBeginning.isEmpty()) {
consumer.seekToBeginning(seekToBeginning);
}
if (!seekToEnd.isEmpty()) {
consumer.seekToEnd(seekToEnd);
}
}
return records;
private void addToResetList(final TopicPartition partition, final Set<TopicPartition> partitions, final String logMessage, final String resetPolicy, final Set<String> loggedTopics) {
final String topic = partition.topic();
if (loggedTopics.add(topic)) {
log.info(String.format(logMessage, getName(), topic, resetPolicy));
}
partitions.add(partition);
}
/**

172
streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java

@ -19,12 +19,16 @@ package org.apache.kafka.streams.integration; @@ -19,12 +19,16 @@ package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@ -43,7 +47,9 @@ import org.junit.experimental.categories.Category; @@ -43,7 +47,9 @@ import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
@ -56,35 +62,71 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { @@ -56,35 +62,71 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final MockTime mockTime = CLUSTER.time;
private static final String TOPIC_1 = "topic-1";
private static final String TOPIC_2 = "topic-2";
private static final String TOPIC_A = "topic-A";
private static final String TOPIC_C = "topic-C";
private static final String TOPIC_Y = "topic-Y";
private static final String TOPIC_Z = "topic-Z";
private static final String TOPIC_1_0 = "topic-1_0";
private static final String TOPIC_2_0 = "topic-2_0";
private static final String TOPIC_A_0 = "topic-A_0";
private static final String TOPIC_C_0 = "topic-C_0";
private static final String TOPIC_Y_0 = "topic-Y_0";
private static final String TOPIC_Z_0 = "topic-Z_0";
private static final String TOPIC_1_1 = "topic-1_1";
private static final String TOPIC_2_1 = "topic-2_1";
private static final String TOPIC_A_1 = "topic-A_1";
private static final String TOPIC_C_1 = "topic-C_1";
private static final String TOPIC_Y_1 = "topic-Y_1";
private static final String TOPIC_Z_1 = "topic-Z_1";
private static final String TOPIC_1_2 = "topic-1_2";
private static final String TOPIC_2_2 = "topic-2_2";
private static final String TOPIC_A_2 = "topic-A_2";
private static final String TOPIC_C_2 = "topic-C_2";
private static final String TOPIC_Y_2 = "topic-Y_2";
private static final String TOPIC_Z_2 = "topic-Z_2";
private static final String NOOP = "noop";
private final Serde<String> stringSerde = Serdes.String();
private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;
private final String topic1TestMessage = "topic-1 test";
private final String topic2TestMessage = "topic-2 test";
private final String topicATestMessage = "topic-A test";
private final String topicCTestMessage = "topic-C test";
private final String topicYTestMessage = "topic-Y test";
private final String topicZTestMessage = "topic-Z test";
@BeforeClass
public static void startKafkaCluster() throws Exception {
CLUSTER.createTopic(TOPIC_1);
CLUSTER.createTopic(TOPIC_2);
CLUSTER.createTopic(TOPIC_A);
CLUSTER.createTopic(TOPIC_C);
CLUSTER.createTopic(TOPIC_Y);
CLUSTER.createTopic(TOPIC_Z);
CLUSTER.createTopic(TOPIC_1_0);
CLUSTER.createTopic(TOPIC_2_0);
CLUSTER.createTopic(TOPIC_A_0);
CLUSTER.createTopic(TOPIC_C_0);
CLUSTER.createTopic(TOPIC_Y_0);
CLUSTER.createTopic(TOPIC_Z_0);
CLUSTER.createTopic(TOPIC_1_1);
CLUSTER.createTopic(TOPIC_2_1);
CLUSTER.createTopic(TOPIC_A_1);
CLUSTER.createTopic(TOPIC_C_1);
CLUSTER.createTopic(TOPIC_Y_1);
CLUSTER.createTopic(TOPIC_Z_1);
CLUSTER.createTopic(TOPIC_1_2);
CLUSTER.createTopic(TOPIC_2_2);
CLUSTER.createTopic(TOPIC_A_2);
CLUSTER.createTopic(TOPIC_C_2);
CLUSTER.createTopic(TOPIC_Y_2);
CLUSTER.createTopic(TOPIC_Z_2);
CLUSTER.createTopic(NOOP);
CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC_0);
CLUSTER.createTopic(OUTPUT_TOPIC_1);
CLUSTER.createTopic(OUTPUT_TOPIC_2);
}
@Before
@ -105,41 +147,64 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { @@ -105,41 +147,64 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
}
@Test
public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception {
public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage);
shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, OUTPUT_TOPIC_0, expectedReceivedValues);
}
@Test
public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues);
}
@Test
public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
commitInvalidOffsets();
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, OUTPUT_TOPIC_2, expectedReceivedValues);
}
private void shouldOnlyReadForEarliest(
final String topicSuffix,
final String topic1,
final String topic2,
final String topicA,
final String topicC,
final String topicY,
final String topicZ,
final String outputTopic,
final List<String> expectedReceivedValues) throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d"));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern1Stream.to(stringSerde, stringSerde, outputTopic);
pattern2Stream.to(stringSerde, stringSerde, outputTopic);
namedTopicsStream.to(stringSerde, stringSerde, outputTopic);
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
final String topic1TestMessage = "topic-1 test";
final String topic2TestMessage = "topic-2 test";
final String topicATestMessage = "topic-A test";
final String topicCTestMessage = "topic-C test";
final String topicYTestMessage = "topic-Y test";
final String topicZTestMessage = "topic-Z test";
IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList(topicATestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList(topicCTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList(topicYTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList(topicZTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topic1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topic2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicA, Collections.singletonList(topicATestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicC, Collections.singletonList(topicCTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicY, Collections.singletonList(topicYTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicZ, Collections.singletonList(topicZTestMessage), producerConfig, mockTime);
final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4);
final List<String> actualValues = new ArrayList<>(4);
final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size());
final List<String> actualValues = new ArrayList<>(expectedReceivedValues.size());
for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
actualValues.add(receivedKeyValue.value);
@ -149,35 +214,50 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { @@ -149,35 +214,50 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
Collections.sort(actualValues);
Collections.sort(expectedReceivedValues);
assertThat(actualValues, equalTo(expectedReceivedValues));
}
private void commitInvalidOffsets() {
final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
StringDeserializer.class,
StringDeserializer.class));
final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));
consumer.commitSync(invalidOffsets);
consumer.close();
}
@Test(expected = TopologyBuilderException.class)
public void shouldThrowExceptionOverlappingPattern() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(TOPIC_Y_1, TOPIC_Z_1);
builder.earliestResetTopicsPattern();
}
@Test(expected = TopologyBuilderException.class)
public void shouldThrowExceptionOverlappingTopic() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d]"));
final KStream<String, String> namedTopicsStream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A, TOPIC_Z);
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
builder.latestResetTopicsPattern();
}
@Test
public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
Properties props = new Properties();

Loading…
Cancel
Save