From da332f2241f1781aca64d1007c0c209b50ec5ff0 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Thu, 13 Dec 2018 23:30:04 -0500 Subject: [PATCH] MINOR:Start processor inside verify message (#6029) This PR fixes a flaky system test. I ran six runs of branch builder, and each run was parameterized to repeat the test 25 times for a total of 150 runs. All test runs passed. https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2122/ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2123/ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2124/ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2128/ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2129/ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2130/ Reviewers: Matthias J. Sax , Guozhang Wang , John Roesler --- .../kafka/streams/tests/StreamsNamedRepartitionTest.java | 1 + .../tests/streams/streams_named_repartition_topic_test.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 660de397a0f..f836baa704f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -63,6 +63,7 @@ public class StreamsNamedRepartitionTest { final StreamsBuilder builder = new StreamsBuilder(); final KStream sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); + sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s, value=%s", k, v))); final KStream mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py index 5baf612d6c6..b9894eef7a6 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -41,7 +41,7 @@ class StreamsNamedRepartitionTopicTest(Test): } self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) - self.kafka = KafkaService(self.test_context, num_nodes=1, + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zookeeper, topics=self.topics) self.producer = VerifiableProducer(self.test_context, @@ -66,7 +66,6 @@ class StreamsNamedRepartitionTopicTest(Test): for processor in processors: processor.CLEAN_NODE_ENABLED = False self.set_topics(processor) - processor.start() self.verify_running(processor, 'REBALANCING -> RUNNING') self.verify_processing(processors) @@ -76,7 +75,6 @@ class StreamsNamedRepartitionTopicTest(Test): self.verify_stopped(processor) # will tell app to add operations before repartition topic processor.ADD_ADDITIONAL_OPS = 'true' - processor.start() self.verify_running(processor, 'UPDATED Topology') self.verify_processing(processors) @@ -89,7 +87,9 @@ class StreamsNamedRepartitionTopicTest(Test): @staticmethod def verify_running(processor, message): - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.start() monitor.wait_until(message, timeout_sec=60, err_msg="Never saw '%s' message " % message + str(processor.node.account))