Browse Source

KAFKA-3935; Fix test_restart_failed_task system test for SinkTasks

Fix the test by using a more liberal timeout and forcing more frequent SinkTask.put() calls. Also add some logging to aid future debugging.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1663 from ewencp/kafka-3935-fix-restart-system-test
pull/1665/head
Ewen Cheslack-Postava 8 years ago committed by Ismael Juma
parent
commit
d1546960de
  1. 7
      connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
  2. 19
      connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
  3. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
  4. 2
      tests/kafkatest/tests/connect/connect_distributed_test.py

7
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java

@ -20,6 +20,8 @@ import org.apache.kafka.common.config.ConfigDef; @@ -20,6 +20,8 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@ -49,6 +51,8 @@ public class MockConnector extends Connector { @@ -49,6 +51,8 @@ public class MockConnector extends Connector {
public static final long DEFAULT_FAILURE_DELAY_MS = 15000;
private static final Logger log = LoggerFactory.getLogger(MockConnector.class);
private Map<String, String> config;
private ScheduledExecutorService executor;
@ -69,10 +73,12 @@ public class MockConnector extends Connector { @@ -69,10 +73,12 @@ public class MockConnector extends Connector {
if (delayMsString != null)
delayMs = Long.parseLong(delayMsString);
log.debug("Started MockConnector with failure delay of {} ms", delayMs);
executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(new Runnable() {
@Override
public void run() {
log.debug("Triggering connector failure");
context.raiseError(new RuntimeException());
}
}, delayMs, TimeUnit.MILLISECONDS);
@ -86,6 +92,7 @@ public class MockConnector extends Connector { @@ -86,6 +92,7 @@ public class MockConnector extends Connector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.debug("Creating single task for MockConnector");
return Collections.singletonList(config);
}

19
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java

@ -21,11 +21,14 @@ import org.apache.kafka.common.TopicPartition; @@ -21,11 +21,14 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
public class MockSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class);
private String mockMode;
private long startTimeMs;
@ -47,6 +50,9 @@ public class MockSinkTask extends SinkTask { @@ -47,6 +50,9 @@ public class MockSinkTask extends SinkTask {
this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
failureDelayMs = Long.parseLong(delayMsString);
log.debug("Started MockSinkTask at {} with failure scheduled in {} ms", startTimeMs, failureDelayMs);
setTimeout();
}
}
@ -54,8 +60,11 @@ public class MockSinkTask extends SinkTask { @@ -54,8 +60,11 @@ public class MockSinkTask extends SinkTask {
public void put(Collection<SinkRecord> records) {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
if (now > startTimeMs + failureDelayMs)
if (now > startTimeMs + failureDelayMs) {
log.debug("Triggering sink task failure");
throw new RuntimeException();
}
setTimeout();
}
}
@ -68,4 +77,12 @@ public class MockSinkTask extends SinkTask { @@ -68,4 +77,12 @@ public class MockSinkTask extends SinkTask {
public void stop() {
}
private void setTimeout() {
// Set a reasonable minimum delay. Since this mock task may not actually consume any data from Kafka, it may only
// see put() calls triggered by wakeups for offset commits. To make sure we aren't tied to the offset commit
// interval, we force a wakeup every 250ms or after the failure delay, whichever is smaller. This is not overly
// aggressive but ensures any scheduled tasks this connector performs are reasonably close to the target time.
context.timeout(Math.min(failureDelayMs, 250));
}
}

9
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java

@ -19,12 +19,15 @@ package org.apache.kafka.connect.tools; @@ -19,12 +19,15 @@ package org.apache.kafka.connect.tools;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MockSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class);
private String mockMode;
private long startTimeMs;
@ -46,6 +49,8 @@ public class MockSourceTask extends SourceTask { @@ -46,6 +49,8 @@ public class MockSourceTask extends SourceTask {
this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
failureDelayMs = Long.parseLong(delayMsString);
log.debug("Started MockSourceTask at {} with failure scheduled in {} ms", startTimeMs, failureDelayMs);
}
}
@ -53,8 +58,10 @@ public class MockSourceTask extends SourceTask { @@ -53,8 +58,10 @@ public class MockSourceTask extends SourceTask {
public List<SourceRecord> poll() throws InterruptedException {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
if (now > startTimeMs + failureDelayMs)
if (now > startTimeMs + failureDelayMs) {
log.debug("Triggering source task failure");
throw new RuntimeException();
}
}
return Collections.emptyList();
}

2
tests/kafkatest/tests/connect/connect_distributed_test.py

@ -171,7 +171,7 @@ class ConnectDistributedTest(Test): @@ -171,7 +171,7 @@ class ConnectDistributedTest(Test):
connector.start()
task_id = 0
wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=15,
wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=20,
err_msg="Failed to see task transition to the FAILED state")
self.cc.restart_task(connector.name, task_id)

Loading…
Cancel
Save