Browse Source

MINOR:Start processor inside verify message (#6029)

This PR fixes a flaky system test.

I ran six runs of branch builder, and each run was parameterized to repeat the test 25 times for a total of 150 runs. All test runs passed.

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2122/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2123/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2124/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2128/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2129/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2130/

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>
pull/6035/head
Bill Bejeck 6 years ago committed by Guozhang Wang
parent
commit
da332f2241
  1. 1
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
  2. 8
      tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py

1
streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java

@ -63,6 +63,7 @@ public class StreamsNamedRepartitionTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s, value=%s", k, v)));
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));

8
tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py

@ -41,7 +41,7 @@ class StreamsNamedRepartitionTopicTest(Test):
} }
self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=1, self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics) zk=self.zookeeper, topics=self.topics)
self.producer = VerifiableProducer(self.test_context, self.producer = VerifiableProducer(self.test_context,
@ -66,7 +66,6 @@ class StreamsNamedRepartitionTopicTest(Test):
for processor in processors: for processor in processors:
processor.CLEAN_NODE_ENABLED = False processor.CLEAN_NODE_ENABLED = False
self.set_topics(processor) self.set_topics(processor)
processor.start()
self.verify_running(processor, 'REBALANCING -> RUNNING') self.verify_running(processor, 'REBALANCING -> RUNNING')
self.verify_processing(processors) self.verify_processing(processors)
@ -76,7 +75,6 @@ class StreamsNamedRepartitionTopicTest(Test):
self.verify_stopped(processor) self.verify_stopped(processor)
# will tell app to add operations before repartition topic # will tell app to add operations before repartition topic
processor.ADD_ADDITIONAL_OPS = 'true' processor.ADD_ADDITIONAL_OPS = 'true'
processor.start()
self.verify_running(processor, 'UPDATED Topology') self.verify_running(processor, 'UPDATED Topology')
self.verify_processing(processors) self.verify_processing(processors)
@ -89,7 +87,9 @@ class StreamsNamedRepartitionTopicTest(Test):
@staticmethod @staticmethod
def verify_running(processor, message): def verify_running(processor, message):
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
monitor.wait_until(message, monitor.wait_until(message,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw '%s' message " % message + str(processor.node.account)) err_msg="Never saw '%s' message " % message + str(processor.node.account))

Loading…
Cancel
Save