Browse Source

KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519)

I looked into the logs of the above tickets, and I think for a couple fo them it is due to the fact that the threads takes time to restore, or just stabilize the rebalance since there are multi-threads. Adding the hook to wait for state to transit to RUNNING upon starting.

Reviewers: Chris Pettitt <cpettitt@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/7521/head
Guozhang Wang 5 years ago committed by GitHub
parent
commit
76fcabc7b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

33
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.StreamsMetadata; @@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@ -92,6 +93,7 @@ import static org.junit.Assert.assertEquals; @@ -92,6 +93,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
@ -261,6 +263,16 @@ public class QueryableStateIntegrationTest { @@ -261,6 +263,16 @@ public class QueryableStateIntegrationTest {
@Override
public void run() {
myStream.start();
try {
TestUtils.waitForCondition(
() -> stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING),
"Did not start successfully after " + TestUtils.DEFAULT_MAX_WAIT_MS + " ms"
);
} catch (final InterruptedException e) {
if (!stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING))
fail("Did not start successfully");
}
}
public void close() {
@ -446,7 +458,8 @@ public class QueryableStateIntegrationTest { @@ -446,7 +458,8 @@ public class QueryableStateIntegrationTest {
windowStoreName,
streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams);
producerThread.start();
try {
@ -515,7 +528,7 @@ public class QueryableStateIntegrationTest { @@ -515,7 +528,7 @@ public class QueryableStateIntegrationTest {
t2.toStream().to(outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams);
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@ -581,7 +594,7 @@ public class QueryableStateIntegrationTest { @@ -581,7 +594,7 @@ public class QueryableStateIntegrationTest {
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams);
waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
@ -629,7 +642,7 @@ public class QueryableStateIntegrationTest { @@ -629,7 +642,7 @@ public class QueryableStateIntegrationTest {
t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams);
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@ -690,7 +703,7 @@ public class QueryableStateIntegrationTest { @@ -690,7 +703,7 @@ public class QueryableStateIntegrationTest {
.windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
.count(Materialized.as(windowStoreName));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams);
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@ -716,7 +729,7 @@ public class QueryableStateIntegrationTest { @@ -716,7 +729,7 @@ public class QueryableStateIntegrationTest {
final String storeName = "count-by-key";
stream.groupByKey().count(Materialized.as(storeName));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams);
final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
IntegrationTestUtils.produceKeyValuesSynchronously(
@ -747,9 +760,9 @@ public class QueryableStateIntegrationTest { @@ -747,9 +760,9 @@ public class QueryableStateIntegrationTest {
// close stream
kafkaStreams.close();
// start again
// start again, and since it may take time to restore we wait for it to transit to RUNNING a bit longer
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
startKafkaStreamsAndWaitForRunningState(kafkaStreams, maxWaitMs);
// make sure we never get any value other than 8 for hello
TestUtils.waitForCondition(
@ -810,7 +823,9 @@ public class QueryableStateIntegrationTest { @@ -810,7 +823,9 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
kafkaStreams.start();
// since we start with two threads, wait for a bit longer for both of them to transit to running
startKafkaStreamsAndWaitForRunningState(kafkaStreams, 30000);
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,

Loading…
Cancel
Save