Browse Source

KAFKA-9472: Remove deleted Connect tasks from status store (#8118)

Although the statuses for tasks are removed from the status store when their connector is deleted, their statuses are not removed when only the task is deleted, which happens in the case that the number of tasks for a connector is reduced.

This commit adds logic for deleting the statuses for those tasks from the status store whenever a rebalance has completed and the leader of a distributed cluster has detected that there are recently-deleted tasks. Standalone is also updated to accomplish this.

Unit tests for the `DistributedHerder` and `StandaloneHerder` classes are updated and an integration test has been added.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
pull/8727/head
Chris Egerton 5 years ago committed by GitHub
parent
commit
de6468ae59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  2. 7
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
  3. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
  4. 23
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  5. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
  6. 39
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
  7. 3
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
  8. 3
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java

7
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

@ -189,10 +189,15 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -189,10 +189,15 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
@Override
public void onDeletion(String connector) {
for (TaskStatus status : statusBackingStore.getAll(connector))
statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
onDeletion(status.id());
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
}
@Override
public void onDeletion(ConnectorTaskId id) {
statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation()));
}
@Override
public void pauseConnector(String connector) {
if (!configBackingStore.contains(connector))

7
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java

@ -61,5 +61,12 @@ public class TaskStatus extends AbstractStatus<ConnectorTaskId> { @@ -61,5 +61,12 @@ public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
*/
void onShutdown(ConnectorTaskId id);
/**
* Invoked after the task has been deleted. Can be called if the
* connector tasks have been reduced, or if the connector itself has
* been deleted.
* @param id The id of the task
*/
void onDeletion(ConnectorTaskId id);
}
}

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java

@ -436,6 +436,12 @@ abstract class WorkerTask implements Runnable { @@ -436,6 +436,12 @@ abstract class WorkerTask implements Runnable {
delegateListener.onShutdown(id);
}
@Override
public void onDeletion(ConnectorTaskId id) {
taskStateTimer.changeState(State.DESTROYED, time.milliseconds());
delegateListener.onDeletion(id);
}
public void recordState(TargetState state) {
switch (state) {
case STARTED:

23
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -46,6 +46,7 @@ import org.apache.kafka.connect.runtime.SessionKey; @@ -46,6 +46,7 @@ import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
@ -1526,6 +1527,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1526,6 +1527,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
private void updateDeletedTaskStatus() {
ClusterConfigState snapshot = configBackingStore.snapshot();
for (String connector : statusBackingStore.connectors()) {
Set<ConnectorTaskId> remainingTasks = new HashSet<>(snapshot.tasks(connector));
statusBackingStore.getAll(connector).stream()
.map(TaskStatus::id)
.filter(task -> !remainingTasks.contains(task))
.forEach(this::onDeletion);
}
}
protected HerderMetrics herderMetrics() {
return herderMetrics;
}
@ -1588,11 +1601,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1588,11 +1601,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
herderMetrics.rebalanceStarted(time.milliseconds());
}
// Delete the statuses of all connectors removed prior to the start of this rebalance. This has to
// be done after the rebalance completes to avoid race conditions as the previous generation attempts
// to change the state to UNASSIGNED after tasks have been stopped.
if (isLeader())
// Delete the statuses of all connectors and tasks removed prior to the start of this rebalance. This
// has to be done after the rebalance completes to avoid race conditions as the previous generation
// attempts to change the state to UNASSIGNED after tasks have been stopped.
if (isLeader()) {
updateDeletedConnectorStatus();
updateDeletedTaskStatus();
}
// We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
// sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the

1
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java

@ -319,6 +319,7 @@ public class StandaloneHerder extends AbstractHerder { @@ -319,6 +319,7 @@ public class StandaloneHerder extends AbstractHerder {
if (!tasks.isEmpty()) {
worker.stopAndAwaitTasks(tasks);
configBackingStore.removeTaskConfigs(connName);
tasks.forEach(this::onDeletion);
}
}

39
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java

@ -102,7 +102,7 @@ public class ConnectWorkerIntegrationTest { @@ -102,7 +102,7 @@ public class ConnectWorkerIntegrationTest {
// create test topic
connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
// setup up props for the sink connector
// set up props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
@ -193,7 +193,7 @@ public class ConnectWorkerIntegrationTest { @@ -193,7 +193,7 @@ public class ConnectWorkerIntegrationTest {
// create test topic
connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
// setup up props for the sink connector
// set up props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
@ -236,4 +236,39 @@ public class ConnectWorkerIntegrationTest { @@ -236,4 +236,39 @@ public class ConnectWorkerIntegrationTest {
"Connector tasks did not start in time.");
}
/**
* Verify that the number of tasks listed in the REST API is updated correctly after changes to
* the "tasks.max" connector configuration.
*/
@Test
public void testTaskStatuses() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
// base connector props
Map<String, String> connectorProps = new HashMap<>();
connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
// start the connector with only one task
final int initialNumTasks = 1;
connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(initialNumTasks));
connect.configureConnector(CONNECTOR_NAME, connectorProps);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, initialNumTasks, "Connector tasks did not start in time");
// then reconfigure it to use more tasks
final int increasedNumTasks = 5;
connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(increasedNumTasks));
connect.configureConnector(CONNECTOR_NAME, connectorProps);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, increasedNumTasks, "Connector task statuses did not update in time.");
// then reconfigure it to use fewer tasks
final int decreasedNumTasks = 3;
connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(decreasedNumTasks));
connect.configureConnector(CONNECTOR_NAME, connectorProps);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, decreasedNumTasks, "Connector task statuses did not update in time.");
}
}

3
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

@ -208,7 +208,7 @@ public class DistributedHerderTest { @@ -208,7 +208,7 @@ public class DistributedHerderTest {
connectProtocolVersion = CONNECT_PROTOCOL_V0;
herder = PowerMock.createPartialMock(DistributedHerder.class,
new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
@ -222,6 +222,7 @@ public class DistributedHerderTest { @@ -222,6 +222,7 @@ public class DistributedHerderTest {
delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
PowerMock.mockStatic(Plugins.class);
PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes();
}
@After

3
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java

@ -266,6 +266,7 @@ public class StandaloneHerderTest { @@ -266,6 +266,7 @@ public class StandaloneHerderTest {
EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
expectDestroy();
@ -434,6 +435,8 @@ public class StandaloneHerderTest { @@ -434,6 +435,8 @@ public class StandaloneHerderTest {
// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
expectStop();
statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
statusBackingStore.stop();
EasyMock.expectLastCall();
worker.stop();

Loading…
Cancel
Save