diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index f4acba042f5..e2d394476c2 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -168,7 +168,6 @@ public class StreamsResetter { consumerConfig.putAll(properties); exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun); maybeDeleteInternalTopics(adminClient, dryRun); - } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index afec99d03d7..f110e577733 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -130,6 +130,7 @@ public class StreamsOptimizedTest { } }); + streams.cleanUp(); streams.start(); Exit.addShutdownHook("streams-shutdown-hook", () -> { diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index f0a24e77c57..66be72df107 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -223,6 +223,10 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): def node(self): return self.nodes[0] + @property + def expectedMessage(self): + return 'StreamsTest instance started' + def pids(self, node): try: pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)] @@ -308,7 +312,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Starting StreamsTest process on " + str(node.account)) with node.account.monitor_log(self.STDOUT_FILE) as monitor: node.account.ssh(self.start_cmd(node)) - monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) + monitor.wait_until(self.expectedMessage, timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -497,6 +501,45 @@ class StreamsStandbyTaskService(StreamsTestBaseService): "org.apache.kafka.streams.tests.StreamsStandByReplicaTest", configs) +class StreamsResetter(StreamsTestBaseService): + def __init__(self, test_context, kafka, topic, applicationId): + super(StreamsResetter, self).__init__(test_context, + kafka, + "kafka.tools.StreamsResetter", + "") + self.topic = topic + self.applicationId = applicationId + + @property + def expectedMessage(self): + return 'Done.' + + def start_cmd(self, node): + args = self.args.copy() + args['bootstrap.servers'] = self.kafka.bootstrap_servers() + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['application.id'] = self.applicationId + args['input.topics'] = self.topic + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "(export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "%(kafka_run_class)s %(streams_class_name)s " \ + "--bootstrap-servers %(bootstrap.servers)s " \ + "--force " \ + "--application-id %(application.id)s " \ + "--input-topics %(input.topics)s " \ + "& echo $! >&3 ) " \ + "1>> %(stdout)s " \ + "2>> %(stderr)s " \ + "3> %(pidfile)s "% args + + self.logger.info("Executing: " + cmd) + + return cmd + class StreamsOptimizedUpgradeTestService(StreamsTestBaseService): def __init__(self, test_context, kafka): diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py index ecd84c2b2e0..adea2ea5ce8 100644 --- a/tests/kafkatest/tests/streams/streams_optimized_test.py +++ b/tests/kafkatest/tests/streams/streams_optimized_test.py @@ -17,6 +17,7 @@ import time from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsOptimizedUpgradeTestService +from kafkatest.services.streams import StreamsResetter from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import stop_processors @@ -77,6 +78,8 @@ class StreamsOptimizedTest(Test): stop_processors(processors, self.stopped_message) + self.reset_application() + # start again with topology optimized for processor in processors: processor.OPTIMIZED_CONFIG = 'all' @@ -90,6 +93,12 @@ class StreamsOptimizedTest(Test): self.kafka.stop() self.zookeeper.stop() + def reset_application(self): + resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest') + resetter.start() + # resetter is not long-term running but it would be better to check the pid by stopping it + resetter.stop() + @staticmethod def verify_running_repartition_topic_count(processor, repartition_topic_count): node = processor.node