From de6468ae5915298279e229dc64721e01e7d14fab Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Mon, 25 May 2020 09:10:03 -0700 Subject: [PATCH] KAFKA-9472: Remove deleted Connect tasks from status store (#8118) Although the statuses for tasks are removed from the status store when their connector is deleted, their statuses are not removed when only the task is deleted, which happens in the case that the number of tasks for a connector is reduced. This commit adds logic for deleting the statuses for those tasks from the status store whenever a rebalance has completed and the leader of a distributed cluster has detected that there are recently-deleted tasks. Standalone is also updated to accomplish this. Unit tests for the `DistributedHerder` and `StandaloneHerder` classes are updated and an integration test has been added. Reviewers: Nigel Liang , Konstantine Karantasis --- .../kafka/connect/runtime/AbstractHerder.java | 7 +++- .../kafka/connect/runtime/TaskStatus.java | 7 ++++ .../kafka/connect/runtime/WorkerTask.java | 6 +++ .../distributed/DistributedHerder.java | 23 +++++++++-- .../runtime/standalone/StandaloneHerder.java | 1 + .../ConnectWorkerIntegrationTest.java | 39 ++++++++++++++++++- .../distributed/DistributedHerderTest.java | 3 +- .../standalone/StandaloneHerderTest.java | 3 ++ 8 files changed, 81 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index a4476f871ed..5c37f2d3110 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -189,10 +189,15 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @Override public void onDeletion(String connector) { for (TaskStatus status : statusBackingStore.getAll(connector)) - statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation())); + onDeletion(status.id()); statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation())); } + @Override + public void onDeletion(ConnectorTaskId id) { + statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); + } + @Override public void pauseConnector(String connector) { if (!configBackingStore.contains(connector)) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java index 5ee90415b96..62bb1c71711 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java @@ -61,5 +61,12 @@ public class TaskStatus extends AbstractStatus { */ void onShutdown(ConnectorTaskId id); + /** + * Invoked after the task has been deleted. Can be called if the + * connector tasks have been reduced, or if the connector itself has + * been deleted. + * @param id The id of the task + */ + void onDeletion(ConnectorTaskId id); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index c40dc90012e..b0a6a0cb1da 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -436,6 +436,12 @@ abstract class WorkerTask implements Runnable { delegateListener.onShutdown(id); } + @Override + public void onDeletion(ConnectorTaskId id) { + taskStateTimer.changeState(State.DESTROYED, time.milliseconds()); + delegateListener.onDeletion(id); + } + public void recordState(TargetState state) { switch (state) { case STARTED: diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 512c700d365..349aa71e54e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -46,6 +46,7 @@ import org.apache.kafka.connect.runtime.SessionKey; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; +import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.RestClient; @@ -1526,6 +1527,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } + private void updateDeletedTaskStatus() { + ClusterConfigState snapshot = configBackingStore.snapshot(); + for (String connector : statusBackingStore.connectors()) { + Set remainingTasks = new HashSet<>(snapshot.tasks(connector)); + + statusBackingStore.getAll(connector).stream() + .map(TaskStatus::id) + .filter(task -> !remainingTasks.contains(task)) + .forEach(this::onDeletion); + } + } + protected HerderMetrics herderMetrics() { return herderMetrics; } @@ -1588,11 +1601,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable { herderMetrics.rebalanceStarted(time.milliseconds()); } - // Delete the statuses of all connectors removed prior to the start of this rebalance. This has to - // be done after the rebalance completes to avoid race conditions as the previous generation attempts - // to change the state to UNASSIGNED after tasks have been stopped. - if (isLeader()) + // Delete the statuses of all connectors and tasks removed prior to the start of this rebalance. This + // has to be done after the rebalance completes to avoid race conditions as the previous generation + // attempts to change the state to UNASSIGNED after tasks have been stopped. + if (isLeader()) { updateDeletedConnectorStatus(); + updateDeletedTaskStatus(); + } // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index a3e75b5829b..6c5398f8e45 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -319,6 +319,7 @@ public class StandaloneHerder extends AbstractHerder { if (!tasks.isEmpty()) { worker.stopAndAwaitTasks(tasks); configBackingStore.removeTaskConfigs(connName); + tasks.forEach(this::onDeletion); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index b8e04970f61..b3f30202055 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -102,7 +102,7 @@ public class ConnectWorkerIntegrationTest { // create test topic connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS); - // setup up props for the sink connector + // set up props for the source connector Map props = new HashMap<>(); props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks)); @@ -193,7 +193,7 @@ public class ConnectWorkerIntegrationTest { // create test topic connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS); - // setup up props for the sink connector + // set up props for the source connector Map props = new HashMap<>(); props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks)); @@ -236,4 +236,39 @@ public class ConnectWorkerIntegrationTest { "Connector tasks did not start in time."); } + /** + * Verify that the number of tasks listed in the REST API is updated correctly after changes to + * the "tasks.max" connector configuration. + */ + @Test + public void testTaskStatuses() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + // base connector props + Map connectorProps = new HashMap<>(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); + + // start the connector with only one task + final int initialNumTasks = 1; + connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(initialNumTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, initialNumTasks, "Connector tasks did not start in time"); + + // then reconfigure it to use more tasks + final int increasedNumTasks = 5; + connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(increasedNumTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, increasedNumTasks, "Connector task statuses did not update in time."); + + // then reconfigure it to use fewer tasks + final int decreasedNumTasks = 3; + connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(decreasedNumTasks)); + connect.configureConnector(CONNECTOR_NAME, connectorProps); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, decreasedNumTasks, "Connector task statuses did not update in time."); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 08118fea29b..879abf8b35a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -208,7 +208,7 @@ public class DistributedHerderTest { connectProtocolVersion = CONNECT_PROTOCOL_V0; herder = PowerMock.createPartialMock(DistributedHerder.class, - new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"}, + new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"}, new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy); @@ -222,6 +222,7 @@ public class DistributedHerderTest { delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); PowerMock.mockStatic(Plugins.class); PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); + PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes(); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 89d4e45fa91..feb0f995b7f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -266,6 +266,7 @@ public class StandaloneHerderTest { EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList()); statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0)); + statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); expectDestroy(); @@ -434,6 +435,8 @@ public class StandaloneHerderTest { // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); + statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); + statusBackingStore.stop(); EasyMock.expectLastCall(); worker.stop();