From 76fcabc7b4bf432b54101d394bcc2ad5affff716 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 15 Oct 2019 11:34:48 -0700 Subject: [PATCH] KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519) I looked into the logs of the above tickets, and I think for a couple fo them it is due to the fact that the threads takes time to restore, or just stabilize the rebalance since there are multi-threads. Adding the hook to wait for state to transit to RUNNING upon starting. Reviewers: Chris Pettitt , Matthias J. Sax --- .../QueryableStateIntegrationTest.java | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index c5dbabe4b05..de2ae276d89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.StreamsMetadata; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; +import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -92,6 +93,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState; @Category({IntegrationTest.class}) public class QueryableStateIntegrationTest { @@ -261,6 +263,16 @@ public class QueryableStateIntegrationTest { @Override public void run() { myStream.start(); + + try { + TestUtils.waitForCondition( + () -> stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING), + "Did not start successfully after " + TestUtils.DEFAULT_MAX_WAIT_MS + " ms" + ); + } catch (final InterruptedException e) { + if (!stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING)) + fail("Did not start successfully"); + } } public void close() { @@ -446,7 +458,8 @@ public class QueryableStateIntegrationTest { windowStoreName, streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); + producerThread.start(); try { @@ -515,7 +528,7 @@ public class QueryableStateIntegrationTest { t2.toStream().to(outputTopic); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 1); @@ -581,7 +594,7 @@ public class QueryableStateIntegrationTest { .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 5); @@ -629,7 +642,7 @@ public class QueryableStateIntegrationTest { t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 1); @@ -690,7 +703,7 @@ public class QueryableStateIntegrationTest { .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE))) .count(Materialized.as(windowStoreName)); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 1); @@ -716,7 +729,7 @@ public class QueryableStateIntegrationTest { final String storeName = "count-by-key"; stream.groupByKey().count(Materialized.as(storeName)); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams); final KeyValue hello = KeyValue.pair("hello", "hello"); IntegrationTestUtils.produceKeyValuesSynchronously( @@ -747,9 +760,9 @@ public class QueryableStateIntegrationTest { // close stream kafkaStreams.close(); - // start again + // start again, and since it may take time to restore we wait for it to transit to RUNNING a bit longer kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); + startKafkaStreamsAndWaitForRunningState(kafkaStreams, maxWaitMs); // make sure we never get any value other than 8 for hello TestUtils.waitForCondition( @@ -810,7 +823,9 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true)); - kafkaStreams.start(); + + // since we start with two threads, wait for a bit longer for both of them to transit to running + startKafkaStreamsAndWaitForRunningState(kafkaStreams, 30000); IntegrationTestUtils.produceKeyValuesSynchronously( streamOne,