Browse Source

KAFKA-5362; Follow up to Streams EOS system test

- improve tests to get rid of calls to `sleep` in Python
 - fixed some flaky test conditions
 - improve debugging

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3542 from mjsax/failing-eos-system-tests
pull/3542/merge
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
51063441d3
  1. 19
      streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
  2. 77
      streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
  3. 1
      streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
  4. 3
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
  5. 2
      tests/kafkatest/services/streams.py
  6. 133
      tests/kafkatest/tests/streams/streams_eos_test.py

19
streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java

@ -53,10 +53,12 @@ public class EosTestClient extends SmokeTestUtil { @@ -53,10 +53,12 @@ public class EosTestClient extends SmokeTestUtil {
@Override
public void run() {
isRunning = false;
streams.close(5, TimeUnit.SECONDS);
streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS);
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-CLOSED");
System.out.flush();
}
}
}));
@ -69,15 +71,26 @@ public class EosTestClient extends SmokeTestUtil { @@ -69,15 +71,26 @@ public class EosTestClient extends SmokeTestUtil {
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-EXCEPTION");
e.printStackTrace();
System.out.flush();
uncaughtException = true;
}
});
streams.setStateListener(new KafkaStreams.StateListener() {
@Override
public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
// don't remove this -- it's required test output
System.out.println(System.currentTimeMillis());
System.out.println("StateChange: " + oldState + " -> " + newState);
System.out.flush();
}
});
streams.start();
}
if (uncaughtException) {
streams.close(5, TimeUnit.SECONDS);
streams.close(TimeUnit.SECONDS.toMillis(60), TimeUnit.SECONDS);
streams = null;
}
sleep(1000);
@ -90,7 +103,7 @@ public class EosTestClient extends SmokeTestUtil { @@ -90,7 +103,7 @@ public class EosTestClient extends SmokeTestUtil {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

77
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java

@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@ -54,15 +55,23 @@ import java.util.Set; @@ -54,15 +55,23 @@ import java.util.Set;
public class EosTestDriver extends SmokeTestUtil {
private static final int MAX_NUMBER_OF_KEYS = 100;
private static final long MAX_IDLE_TIME_MS = 300000L;
private static final long MAX_IDLE_TIME_MS = 600000L;
private static boolean isRunning = true;
static int numRecordsProduced = 0;
static synchronized void updateNumRecordsProduces(final int delta) {
numRecordsProduced += delta;
}
static void generate(final String kafka) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("Terminating");
System.out.flush();
isRunning = false;
}
});
@ -78,7 +87,6 @@ public class EosTestDriver extends SmokeTestUtil { @@ -78,7 +87,6 @@ public class EosTestDriver extends SmokeTestUtil {
final Random rand = new Random(System.currentTimeMillis());
int numRecordsProduced = 0;
while (isRunning) {
final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
final int value = rand.nextInt(10000);
@ -89,20 +97,47 @@ public class EosTestDriver extends SmokeTestUtil { @@ -89,20 +97,47 @@ public class EosTestDriver extends SmokeTestUtil {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace();
Exit.exit(1);
exception.printStackTrace(System.err);
System.err.flush();
if (exception instanceof TimeoutException) {
try {
// message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
updateNumRecordsProduces(-expired);
} catch (Exception ignore) { }
}
}
}
});
numRecordsProduced++;
updateNumRecordsProduces(1);
if (numRecordsProduced % 1000 == 0) {
System.out.println(numRecordsProduced + " records produced");
System.out.flush();
}
Utils.sleep(rand.nextInt(50));
Utils.sleep(rand.nextInt(10));
}
producer.close();
System.out.println(numRecordsProduced + " records produced");
System.out.println("Producer closed: " + numRecordsProduced + " records produced");
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, "data");
System.out.println("Partitions: " + partitions);
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (final TopicPartition tp : partitions) {
System.out.println("End-offset for " + tp + " is " + consumer.position(tp));
}
}
System.out.flush();
}
public static void verify(final String kafka, final boolean withRepartitioning) {
@ -180,6 +215,7 @@ public class EosTestDriver extends SmokeTestUtil { @@ -180,6 +215,7 @@ public class EosTestDriver extends SmokeTestUtil {
// do not modify: required test output
System.out.println("ALL-RECORDS-DELIVERED");
System.out.flush();
}
private static void ensureStreamsApplicationDown(final String kafka) {
@ -190,7 +226,7 @@ public class EosTestDriver extends SmokeTestUtil { @@ -190,7 +226,7 @@ public class EosTestDriver extends SmokeTestUtil {
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
if (System.currentTimeMillis() > maxWaitTime) {
throw new RuntimeException("Streams application not down after 30 seconds.");
throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds.");
}
sleep(1000);
}
@ -240,16 +276,20 @@ public class EosTestDriver extends SmokeTestUtil { @@ -240,16 +276,20 @@ public class EosTestDriver extends SmokeTestUtil {
final Map<TopicPartition, Long> readEndOffsets,
final boolean withRepartitioning,
final boolean isInputTopic) {
System.err.println("read end offset: " + readEndOffsets);
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
final Map<TopicPartition, Long> maxReceivedOffsetPerPartition = new HashMap<>();
final Map<TopicPartition, Long> maxConsumerPositionPerPartition = new HashMap<>();
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
boolean allRecordsReceived = false;
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);
final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
maxReceivedOffsetPerPartition.put(tp, record.offset());
final long readEndOffset = readEndOffsets.get(tp);
if (record.offset() < readEndOffset) {
addRecord(record, recordPerTopicPerPartition, withRepartitioning);
@ -257,7 +297,11 @@ public class EosTestDriver extends SmokeTestUtil { @@ -257,7 +297,11 @@ public class EosTestDriver extends SmokeTestUtil {
throw new RuntimeException("FAIL: did receive more records than expected for " + tp
+ " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
}
if (consumer.position(tp) >= readEndOffset) {
}
for (final TopicPartition tp : readEndOffsets.keySet()) {
maxConsumerPositionPerPartition.put(tp, consumer.position(tp));
if (consumer.position(tp) >= readEndOffsets.get(tp)) {
consumer.pause(Collections.singletonList(tp));
}
}
@ -266,7 +310,10 @@ public class EosTestDriver extends SmokeTestUtil { @@ -266,7 +310,10 @@ public class EosTestDriver extends SmokeTestUtil {
}
if (!allRecordsReceived) {
throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
System.err.println("Pause partitions (ie, received all data): " + consumer.paused());
System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition);
System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition);
throw new RuntimeException("FAIL: did not receive all records after " + (MAX_IDLE_TIME_MS / 1000) + " sec idle time.");
}
return recordPerTopicPerPartition;
@ -530,7 +577,8 @@ public class EosTestDriver extends SmokeTestUtil { @@ -530,7 +577,8 @@ public class EosTestDriver extends SmokeTestUtil {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace();
exception.printStackTrace(System.err);
System.err.flush();
Exit.exit(1);
}
}
@ -540,10 +588,11 @@ public class EosTestDriver extends SmokeTestUtil { @@ -540,10 +588,11 @@ public class EosTestDriver extends SmokeTestUtil {
final StringDeserializer stringDeserializer = new StringDeserializer();
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (final ConsumerRecord<byte[], byte[]> record : records) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
final String topic = record.topic();
final TopicPartition tp = new TopicPartition(topic, record.partition());
@ -564,7 +613,7 @@ public class EosTestDriver extends SmokeTestUtil { @@ -564,7 +613,7 @@ public class EosTestDriver extends SmokeTestUtil {
}
}
if (!partitions.isEmpty()) {
throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000) + " sec.");
}
}

1
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java

@ -60,6 +60,7 @@ public class SmokeTestUtil { @@ -60,6 +60,7 @@ public class SmokeTestUtil {
}
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println(System.currentTimeMillis());
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}

3
streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java

@ -33,6 +33,7 @@ public class StreamsEosTest { @@ -33,6 +33,7 @@ public class StreamsEosTest {
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("command=" + command);
System.out.flush();
if (command == null || stateDir == null) {
System.exit(-1);
@ -56,6 +57,8 @@ public class StreamsEosTest { @@ -56,6 +57,8 @@ public class StreamsEosTest {
break;
default:
System.out.println("unknown command: " + command);
System.out.flush();
System.exit(-1);
}
}

2
tests/kafkatest/services/streams.py

@ -135,7 +135,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): @@ -135,7 +135,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
self.logger.info("Starting StreamsTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")

133
tests/kafkatest/tests/streams/streams_eos_test.py

@ -19,7 +19,6 @@ from ducktape.mark import ignore @@ -19,7 +19,6 @@ from ducktape.mark import ignore
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
import time
class StreamsEosTest(KafkaTest):
@ -29,106 +28,138 @@ class StreamsEosTest(KafkaTest): @@ -29,106 +28,138 @@ class StreamsEosTest(KafkaTest):
def __init__(self, test_context):
super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'data' : { 'partitions': 5, 'replication-factor': 2 },
'echo' : { 'partitions': 5, 'replication-factor': 2 },
'min' : { 'partitions': 5, 'replication-factor': 2 },
'sum' : { 'partitions': 5, 'replication-factor': 2 },
'repartition' : { 'partitions': 5, 'replication-factor': 2 },
'max' : { 'partitions': 5, 'replication-factor': 2 },
'cnt' : { 'partitions': 5, 'replication-factor': 2 }
'data': {'partitions': 5, 'replication-factor': 2},
'echo': {'partitions': 5, 'replication-factor': 2},
'min': {'partitions': 5, 'replication-factor': 2},
'sum': {'partitions': 5, 'replication-factor': 2},
'repartition': {'partitions': 5, 'replication-factor': 2},
'max': {'partitions': 5, 'replication-factor': 2},
'cnt': {'partitions': 5, 'replication-factor': 2}
})
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
@ignore
@cluster(num_nodes=8)
@cluster(num_nodes=9)
def test_rebalance_simple(self):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@ignore
@cluster(num_nodes=8)
@cluster(num_nodes=9)
def test_rebalance_complex(self):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
def run_rebalance(self, processor1, processor2, verifier):
def run_rebalance(self, processor1, processor2, processor3, verifier):
"""
Starts and stops two test clients a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
processor1.start()
time.sleep(120)
processor2.start()
time.sleep(120)
processor1.stop()
time.sleep(120)
processor1.start()
time.sleep(120)
processor2.stop()
time.sleep(120)
self.add_streams(processor1)
self.add_streams2(processor1, processor2)
self.add_streams3(processor1, processor2, processor3)
self.stop_streams3(processor2, processor3, processor1)
self.add_streams3(processor2, processor3, processor1)
self.stop_streams3(processor1, processor3, processor2)
self.stop_streams2(processor1, processor3)
self.stop_streams(processor1)
self.driver.stop()
processor1.stop()
processor2.stop()
verifier.start()
verifier.wait()
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@ignore
@cluster(num_nodes=8)
@cluster(num_nodes=9)
def test_failure_and_recovery(self):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@ignore
@cluster(num_nodes=8)
@cluster(num_nodes=9)
def test_failure_and_recovery_complex(self):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
def run_failure_and_recovery(self, processor1, processor2, verifier):
def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
"""
Starts two test clients, then abort (kill -9) and restart them a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
processor1.start()
processor2.start()
time.sleep(120)
processor1.abortThenRestart()
time.sleep(120)
processor1.abortThenRestart()
time.sleep(120)
processor2.abortThenRestart()
time.sleep(120)
self.add_streams(processor1)
self.add_streams2(processor1, processor2)
self.add_streams3(processor1, processor2, processor3)
self.abort_streams(processor2, processor3, processor1)
self.add_streams3(processor2, processor3, processor1)
self.abort_streams(processor2, processor3, processor1)
self.add_streams3(processor2, processor3, processor1)
self.abort_streams(processor1, processor3, processor2)
self.stop_streams2(processor1, processor3)
self.stop_streams(processor1)
self.driver.stop()
processor1.stop()
processor2.stop()
verifier.start()
verifier.wait()
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
def add_streams(self, processor):
processor.start()
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
self.wait_for_startup(monitor, processor)
def add_streams2(self, running_processor, processor_to_be_started):
with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
self.add_streams(processor_to_be_started)
self.wait_for_startup(monitor, running_processor)
def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
self.add_streams2(running_processor2, processor_to_be_started)
self.wait_for_startup(monitor, running_processor1)
def stop_streams(self, processor_to_be_stopped):
with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
processor_to_be_stopped.stop()
self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
self.stop_streams(processor_to_be_stopped)
self.wait_for_startup(monitor, keep_alive_processor)
def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
self.wait_for_startup(monitor, keep_alive_processor1)
def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
processor_to_be_aborted.stop_nodes(False)
self.wait_for_startup(monitor2, keep_alive_processor2)
self.wait_for_startup(monitor1, keep_alive_processor1)
def wait_for_startup(self, monitor, processor):
self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING")
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
self.wait_for(monitor, processor, "processed 500 records from topic=data")
def wait_for(self, monitor, processor, output):
monitor.wait_until(output,
timeout_sec=300,
err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))

Loading…
Cancel
Save