Browse Source

KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)

Changed the WorkerTest to use a mock Executor.

Author: Attila Doroszlai <adoroszlai@apache.org>
Reviewer: Randall Hauch <rhauch@gmail.com>
pull/6537/head
Doroszlai, Attila 6 years ago committed by Randall Hauch
parent
commit
d3316bc6a7
  1. 15
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  2. 17
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

15
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -98,16 +98,27 @@ public class Worker {
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
private WorkerConfigTransformer workerConfigTransformer; private WorkerConfigTransformer workerConfigTransformer;
@SuppressWarnings("deprecation")
public Worker( public Worker(
String workerId, String workerId,
Time time, Time time,
Plugins plugins, Plugins plugins,
WorkerConfig config, WorkerConfig config,
OffsetBackingStore offsetBackingStore OffsetBackingStore offsetBackingStore
) {
this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool());
}
@SuppressWarnings("deprecation")
Worker(
String workerId,
Time time,
Plugins plugins,
WorkerConfig config,
OffsetBackingStore offsetBackingStore,
ExecutorService executorService
) { ) {
this.metrics = new ConnectMetrics(workerId, config, time); this.metrics = new ConnectMetrics(workerId, config, time);
this.executor = Executors.newCachedThreadPool(); this.executor = executorService;
this.workerId = workerId; this.workerId = workerId;
this.time = time; this.time = time;
this.plugins = plugins; this.plugins = plugins;

17
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

@ -71,6 +71,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR; import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyObject;
@ -118,6 +119,7 @@ public class WorkerTest extends ThreadedTest {
@Mock private Converter taskKeyConverter; @Mock private Converter taskKeyConverter;
@Mock private Converter taskValueConverter; @Mock private Converter taskValueConverter;
@Mock private HeaderConverter taskHeaderConverter; @Mock private HeaderConverter taskHeaderConverter;
@Mock private ExecutorService executorService;
@Before @Before
public void setup() { public void setup() {
@ -543,8 +545,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter); expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter);
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
workerTask.run(); EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expectLastCall();
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@ -568,7 +569,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll(); PowerMock.replayAll();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start(); worker.start();
assertStatistics(worker, 0, 0); assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0);
@ -685,8 +686,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
workerTask.run(); EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expectLastCall();
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@ -712,7 +712,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll(); PowerMock.replayAll();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start(); worker.start();
assertStatistics(worker, 0, 0); assertStatistics(worker, 0, 0);
worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@ -778,8 +778,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
workerTask.run(); EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expectLastCall();
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@ -803,7 +802,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll(); PowerMock.replayAll();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start(); worker.start();
assertStatistics(worker, 0, 0); assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds()); assertEquals(Collections.emptySet(), worker.taskIds());

Loading…
Cancel
Save