Browse Source

KAFKA-10191 fix flaky StreamsOptimizedTest (#8913)

Call KafkaStreams#cleanUp to reset local state before starting application up the second run.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
pull/8274/head
Chia-Ping Tsai 4 years ago committed by GitHub
parent
commit
6953161125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      core/src/main/scala/kafka/tools/StreamsResetter.java
  2. 1
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
  3. 45
      tests/kafkatest/services/streams.py
  4. 9
      tests/kafkatest/tests/streams/streams_optimized_test.py

1
core/src/main/scala/kafka/tools/StreamsResetter.java

@ -168,7 +168,6 @@ public class StreamsResetter { @@ -168,7 +168,6 @@ public class StreamsResetter {
consumerConfig.putAll(properties);
exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
maybeDeleteInternalTopics(adminClient, dryRun);
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
System.err.println("ERROR: " + e);

1
streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java

@ -130,6 +130,7 @@ public class StreamsOptimizedTest { @@ -130,6 +130,7 @@ public class StreamsOptimizedTest {
}
});
streams.cleanUp();
streams.start();
Exit.addShutdownHook("streams-shutdown-hook", () -> {

45
tests/kafkatest/services/streams.py

@ -223,6 +223,10 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): @@ -223,6 +223,10 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
def node(self):
return self.nodes[0]
@property
def expectedMessage(self):
return 'StreamsTest instance started'
def pids(self, node):
try:
pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)]
@ -308,7 +312,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): @@ -308,7 +312,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Starting StreamsTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
monitor.wait_until(self.expectedMessage, timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
@ -497,6 +501,45 @@ class StreamsStandbyTaskService(StreamsTestBaseService): @@ -497,6 +501,45 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
"org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
configs)
class StreamsResetter(StreamsTestBaseService):
def __init__(self, test_context, kafka, topic, applicationId):
super(StreamsResetter, self).__init__(test_context,
kafka,
"kafka.tools.StreamsResetter",
"")
self.topic = topic
self.applicationId = applicationId
@property
def expectedMessage(self):
return 'Done.'
def start_cmd(self, node):
args = self.args.copy()
args['bootstrap.servers'] = self.kafka.bootstrap_servers()
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j'] = self.LOG4J_CONFIG_FILE
args['application.id'] = self.applicationId
args['input.topics'] = self.topic
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "(export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"%(kafka_run_class)s %(streams_class_name)s " \
"--bootstrap-servers %(bootstrap.servers)s " \
"--force " \
"--application-id %(application.id)s " \
"--input-topics %(input.topics)s " \
"& echo $! >&3 ) " \
"1>> %(stdout)s " \
"2>> %(stderr)s " \
"3> %(pidfile)s "% args
self.logger.info("Executing: " + cmd)
return cmd
class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka):

9
tests/kafkatest/tests/streams/streams_optimized_test.py

@ -17,6 +17,7 @@ import time @@ -17,6 +17,7 @@ import time
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.streams import StreamsResetter
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import stop_processors
@ -77,6 +78,8 @@ class StreamsOptimizedTest(Test): @@ -77,6 +78,8 @@ class StreamsOptimizedTest(Test):
stop_processors(processors, self.stopped_message)
self.reset_application()
# start again with topology optimized
for processor in processors:
processor.OPTIMIZED_CONFIG = 'all'
@ -90,6 +93,12 @@ class StreamsOptimizedTest(Test): @@ -90,6 +93,12 @@ class StreamsOptimizedTest(Test):
self.kafka.stop()
self.zookeeper.stop()
def reset_application(self):
resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
resetter.start()
# resetter is not long-term running but it would be better to check the pid by stopping it
resetter.stop()
@staticmethod
def verify_running_repartition_topic_count(processor, repartition_topic_count):
node = processor.node

Loading…
Cancel
Save