From d3316bc6a7834db9eef6dde21728891ba6b07b37 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" <6454655+adoroszlai@users.noreply.github.com> Date: Wed, 3 Apr 2019 22:00:05 +0200 Subject: [PATCH] KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475) Changed the WorkerTest to use a mock Executor. Author: Attila Doroszlai Reviewer: Randall Hauch --- .../apache/kafka/connect/runtime/Worker.java | 15 +++++++++++++-- .../kafka/connect/runtime/WorkerTest.java | 17 ++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 673bd4e0c3d..e8679837e76 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -98,16 +98,27 @@ public class Worker { private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; private WorkerConfigTransformer workerConfigTransformer; - @SuppressWarnings("deprecation") public Worker( String workerId, Time time, Plugins plugins, WorkerConfig config, OffsetBackingStore offsetBackingStore + ) { + this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool()); + } + + @SuppressWarnings("deprecation") + Worker( + String workerId, + Time time, + Plugins plugins, + WorkerConfig config, + OffsetBackingStore offsetBackingStore, + ExecutorService executorService ) { this.metrics = new ConnectMetrics(workerId, config, time); - this.executor = Executors.newCachedThreadPool(); + this.executor = executorService; this.workerId = workerId; this.time = time; this.plugins = plugins; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 8f15c87c501..eef10f0d590 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -71,6 +71,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR; import static org.easymock.EasyMock.anyObject; @@ -118,6 +119,7 @@ public class WorkerTest extends ThreadedTest { @Mock private Converter taskKeyConverter; @Mock private Converter taskValueConverter; @Mock private HeaderConverter taskHeaderConverter; + @Mock private ExecutorService executorService; @Before public void setup() { @@ -543,8 +545,7 @@ public class WorkerTest extends ThreadedTest { expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter); expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); - workerTask.run(); - EasyMock.expectLastCall(); + EasyMock.expect(executorService.submit(workerTask)).andReturn(null); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) @@ -568,7 +569,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService); worker.start(); assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); @@ -685,8 +686,7 @@ public class WorkerTest extends ThreadedTest { expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); - workerTask.run(); - EasyMock.expectLastCall(); + EasyMock.expect(executorService.submit(workerTask)).andReturn(null); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) @@ -712,7 +712,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService); worker.start(); assertStatistics(worker, 0, 0); worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); @@ -778,8 +778,7 @@ public class WorkerTest extends ThreadedTest { expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); - workerTask.run(); - EasyMock.expectLastCall(); + EasyMock.expect(executorService.submit(workerTask)).andReturn(null); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) @@ -803,7 +802,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService); worker.start(); assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds());