diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 660de397a0f..f836baa704f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -63,6 +63,7 @@ public class StreamsNamedRepartitionTest { final StreamsBuilder builder = new StreamsBuilder(); final KStream sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); + sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s, value=%s", k, v))); final KStream mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py index 5baf612d6c6..b9894eef7a6 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -41,7 +41,7 @@ class StreamsNamedRepartitionTopicTest(Test): } self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) - self.kafka = KafkaService(self.test_context, num_nodes=1, + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zookeeper, topics=self.topics) self.producer = VerifiableProducer(self.test_context, @@ -66,7 +66,6 @@ class StreamsNamedRepartitionTopicTest(Test): for processor in processors: processor.CLEAN_NODE_ENABLED = False self.set_topics(processor) - processor.start() self.verify_running(processor, 'REBALANCING -> RUNNING') self.verify_processing(processors) @@ -76,7 +75,6 @@ class StreamsNamedRepartitionTopicTest(Test): self.verify_stopped(processor) # will tell app to add operations before repartition topic processor.ADD_ADDITIONAL_OPS = 'true' - processor.start() self.verify_running(processor, 'UPDATED Topology') self.verify_processing(processors) @@ -89,7 +87,9 @@ class StreamsNamedRepartitionTopicTest(Test): @staticmethod def verify_running(processor, message): - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.start() monitor.wait_until(message, timeout_sec=60, err_msg="Never saw '%s' message " % message + str(processor.node.account))