Browse Source

KAFKA-14783 (KIP-875): New STOPPED state for connectors (#13424)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, Greg Harris <gharris1727@gmail.com>
pull/13542/head
Chris Egerton 2 years ago committed by GitHub
parent
commit
e49a5a265f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  2. 3
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
  3. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
  4. 14
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
  5. 19
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
  6. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
  7. 70
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
  8. 5
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
  9. 26
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
  10. 133
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  11. 13
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
  12. 16
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
  13. 33
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
  14. 175
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
  15. 4
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
  16. 4
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
  17. 3
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
  18. 2
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
  19. 170
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
  20. 203
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
  21. 55
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  22. 56
      connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
  23. 65
      connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
  24. 67
      connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
  25. 1
      gradle/spotbugs-exclude.xml

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

@ -171,6 +171,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -171,6 +171,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
workerId, generation()));
}
@Override
public void onStop(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.STOPPED,
workerId, generation()));
}
@Override
public void onPause(String connector) {
statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.PAUSED,

3
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java

@ -25,8 +25,9 @@ public abstract class AbstractStatus<T> { @@ -25,8 +25,9 @@ public abstract class AbstractStatus<T> {
RUNNING,
PAUSED,
FAILED,
DESTROYED,
DESTROYED, // Never visible to users; destroyed Connector and Task instances are not shown
RESTARTING,
STOPPED, // Only ever visible to users for Connector instances; never for Task instances
}
private final T id;

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java

@ -45,6 +45,12 @@ public class ConnectorStatus extends AbstractStatus<String> { @@ -45,6 +45,12 @@ public class ConnectorStatus extends AbstractStatus<String> {
*/
void onFailure(String connector, Throwable cause);
/**
* Invoked when the connector is stopped through the REST API
* @param connector The connector name
*/
void onStop(String connector);
/**
* Invoked when the connector is paused through the REST API
* @param connector The connector name

14
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java

@ -246,9 +246,23 @@ public interface Herder { @@ -246,9 +246,23 @@ public interface Herder {
*/
void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> cb);
/**
* Stop the connector. This call will asynchronously suspend processing by the connector and
* shut down all of its tasks.
* @param connector name of the connector
* @param cb callback to invoke upon completion
*/
void stopConnector(String connector, Callback<Void> cb);
/**
* Pause the connector. This call will asynchronously suspend processing by the connector and all
* of its tasks.
* <p>
* Note that, unlike {@link #stopConnector(String, Callback)}, tasks for this connector will not
* be shut down and none of their resources will be de-allocated. Instead, they will be left in an
* "idling" state where no data is polled from them (if source tasks) or given to them (if sink tasks),
* but all internal state kept by the tasks and their resources is left intact and ready to begin
* processing records again as soon as the connector is {@link #resumeConnector(String) resumed}.
* @param connector name of the connector
*/
void pauseConnector(String connector);

19
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java

@ -73,6 +73,7 @@ public class StateTracker { @@ -73,6 +73,7 @@ public class StateTracker {
private final long unassignedTotalTimeMs;
private final long runningTotalTimeMs;
private final long pausedTotalTimeMs;
private final long stoppedTotalTimeMs;
private final long failedTotalTimeMs;
private final long destroyedTotalTimeMs;
private final long restartingTotalTimeMs;
@ -81,16 +82,17 @@ public class StateTracker { @@ -81,16 +82,17 @@ public class StateTracker {
* The initial StateChange instance before any state has changed.
*/
StateChange() {
this(null, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
this(null, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
}
StateChange(State state, long startTime, long unassignedTotalTimeMs, long runningTotalTimeMs,
long pausedTotalTimeMs, long failedTotalTimeMs, long destroyedTotalTimeMs, long restartingTotalTimeMs) {
StateChange(State state, long startTime, long unassignedTotalTimeMs, long runningTotalTimeMs, long pausedTotalTimeMs,
long stoppedTotalTimeMs, long failedTotalTimeMs, long destroyedTotalTimeMs, long restartingTotalTimeMs) {
this.state = state;
this.startTime = startTime;
this.unassignedTotalTimeMs = unassignedTotalTimeMs;
this.runningTotalTimeMs = runningTotalTimeMs;
this.pausedTotalTimeMs = pausedTotalTimeMs;
this.stoppedTotalTimeMs = stoppedTotalTimeMs;
this.failedTotalTimeMs = failedTotalTimeMs;
this.destroyedTotalTimeMs = destroyedTotalTimeMs;
this.restartingTotalTimeMs = restartingTotalTimeMs;
@ -106,7 +108,7 @@ public class StateTracker { @@ -106,7 +108,7 @@ public class StateTracker {
*/
public StateChange newState(State state, long now) {
if (this.state == null) {
return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L, 0L);
return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
}
if (state == this.state) {
return this;
@ -114,6 +116,7 @@ public class StateTracker { @@ -114,6 +116,7 @@ public class StateTracker {
long unassignedTime = this.unassignedTotalTimeMs;
long runningTime = this.runningTotalTimeMs;
long pausedTime = this.pausedTotalTimeMs;
long stoppedTime = this.stoppedTotalTimeMs;
long failedTime = this.failedTotalTimeMs;
long destroyedTime = this.destroyedTotalTimeMs;
long restartingTime = this.restartingTotalTimeMs;
@ -128,6 +131,9 @@ public class StateTracker { @@ -128,6 +131,9 @@ public class StateTracker {
case PAUSED:
pausedTime += duration;
break;
case STOPPED:
stoppedTime += duration;
break;
case FAILED:
failedTime += duration;
break;
@ -138,7 +144,7 @@ public class StateTracker { @@ -138,7 +144,7 @@ public class StateTracker {
restartingTime += duration;
break;
}
return new StateChange(state, now, unassignedTime, runningTime, pausedTime, failedTime, destroyedTime, restartingTime);
return new StateChange(state, now, unassignedTime, runningTime, pausedTime, stoppedTime, failedTime, destroyedTime, restartingTime);
}
/**
@ -164,6 +170,9 @@ public class StateTracker { @@ -164,6 +170,9 @@ public class StateTracker {
case PAUSED:
durationDesired += pausedTotalTimeMs;
break;
case STOPPED:
durationDesired += stoppedTotalTimeMs;
break;
case FAILED:
durationDesired += failedTotalTimeMs;
break;

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java

@ -22,7 +22,10 @@ package org.apache.kafka.connect.runtime; @@ -22,7 +22,10 @@ package org.apache.kafka.connect.runtime;
* target state is "STARTED." This does not mean it has actually started, just that
* the Connect framework will attempt to start it after its tasks have been assigned.
* After the connector has been paused, the target state will change to PAUSED,
* and all the tasks will stop doing work.
* and all the tasks will stop doing work. A target state of STOPPED is similar to
* PAUSED, but is also accompanied by a full shutdown of the connector's tasks,
* including deallocation of any Kafka clients, SMTs, and other resources brought
* up for or by that task.
* <p>
* Target states are persisted in the config topic, which is read by all of the
* workers in the group. When a worker sees a new target state for a connector which
@ -33,4 +36,5 @@ package org.apache.kafka.connect.runtime; @@ -33,4 +36,5 @@ package org.apache.kafka.connect.runtime;
public enum TargetState {
STARTED,
PAUSED,
STOPPED,
}

70
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java

@ -57,7 +57,8 @@ public class WorkerConnector implements Runnable { @@ -57,7 +57,8 @@ public class WorkerConnector implements Runnable {
private enum State {
INIT, // initial state before startup
STOPPED, // the connector has been stopped/paused.
PAUSED, // The connector has been paused.
STOPPED, // the connector has been stopped.
STARTED, // the connector has been started/resumed.
FAILED, // the connector has failed (no further transitions are possible after this state)
}
@ -186,6 +187,7 @@ public class WorkerConnector implements Runnable { @@ -186,6 +187,7 @@ public class WorkerConnector implements Runnable {
return false;
case INIT:
case PAUSED:
case STOPPED:
connector.start(config);
this.state = State.STARTED;
@ -220,29 +222,40 @@ public class WorkerConnector implements Runnable { @@ -220,29 +222,40 @@ public class WorkerConnector implements Runnable {
return state == State.STARTED;
}
@SuppressWarnings("fallthrough")
private void pause() {
private void suspend(boolean paused) {
State newState = paused ? State.PAUSED : State.STOPPED;
try {
switch (state) {
case STOPPED:
return;
if (state == newState) {
// Already in the desired state
return;
}
case STARTED:
connector.stop();
// fall through
if (state == State.STARTED) {
connector.stop();
}
case INIT:
statusListener.onPause(connName);
this.state = State.STOPPED;
break;
if (state == State.FAILED && newState != State.STOPPED) {
throw new IllegalArgumentException("Cannot transition to non-stopped state when connector has already failed");
}
default:
throw new IllegalArgumentException("Cannot pause connector in state " + state);
if (paused) {
statusListener.onPause(connName);
} else {
statusListener.onStop(connName);
}
this.state = newState;
} catch (Throwable t) {
log.error("{} Error while shutting down connector", this, t);
statusListener.onFailure(connName, t);
this.state = State.FAILED;
log.error("{} Error while {} connector", this, paused ? "pausing" : "stopping", t);
if (paused) {
statusListener.onFailure(connName, t);
this.state = State.FAILED;
} else {
// We say the connector is STOPPED even if it fails at this point
this.state = State.STOPPED;
// One more try to make sure the status is updated correctly
statusListener.onStop(connName);
}
}
}
@ -332,7 +345,8 @@ public class WorkerConnector implements Runnable { @@ -332,7 +345,8 @@ public class WorkerConnector implements Runnable {
}
void doTransitionTo(TargetState targetState, Callback<TargetState> stateChangeCallback) {
if (state == State.FAILED) {
// Edge case: we don't do transitions most of the time if we've already failed, but for the STOPPED state, it's fine
if (state == State.FAILED && targetState != TargetState.STOPPED) {
stateChangeCallback.onCompletion(
new ConnectException(this + " Cannot transition connector to " + targetState + " since it has failed"),
null);
@ -354,7 +368,9 @@ public class WorkerConnector implements Runnable { @@ -354,7 +368,9 @@ public class WorkerConnector implements Runnable {
private void doTransitionTo(TargetState targetState) throws Throwable {
log.debug("{} Transition connector to {}", this, targetState);
if (targetState == TargetState.PAUSED) {
pause();
suspend(true);
} else if (targetState == TargetState.STOPPED) {
suspend(false);
} else if (targetState == TargetState.STARTED) {
if (state == State.INIT)
start();
@ -448,6 +464,16 @@ public class WorkerConnector implements Runnable { @@ -448,6 +464,16 @@ public class WorkerConnector implements Runnable {
}
}
@Override
public void onStop(String connector) {
state = AbstractStatus.State.STOPPED;
synchronized (this) {
if (!cancelled) {
delegate.onStop(connector);
}
}
}
@Override
public void onPause(String connector) {
state = AbstractStatus.State.PAUSED;
@ -502,6 +528,10 @@ public class WorkerConnector implements Runnable { @@ -502,6 +528,10 @@ public class WorkerConnector implements Runnable {
return state == AbstractStatus.State.PAUSED;
}
boolean isStopped() {
return state == AbstractStatus.State.STOPPED;
}
boolean isFailed() {
return state == AbstractStatus.State.FAILED;
}

5
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java

@ -128,6 +128,11 @@ class WorkerMetricsGroup { @@ -128,6 +128,11 @@ class WorkerMetricsGroup {
delegateListener.onStartup(connector);
}
@Override
public void onStop(final String connector) {
delegateListener.onStop(connector);
}
@Override
public void onPause(final String connector) {
delegateListener.onPause(connector);

26
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java

@ -169,7 +169,9 @@ abstract class WorkerTask implements Runnable { @@ -169,7 +169,9 @@ abstract class WorkerTask implements Runnable {
}
protected boolean isStopping() {
return stopping;
// The target state should never be STOPPED, but if things go wrong and it somehow is,
// we handle that identically to a request to shut down the task
return stopping || targetState == TargetState.STOPPED;
}
protected boolean isCancelled() {
@ -188,7 +190,7 @@ abstract class WorkerTask implements Runnable { @@ -188,7 +190,7 @@ abstract class WorkerTask implements Runnable {
private void doRun() throws InterruptedException {
try {
synchronized (this) {
if (stopping)
if (isStopping())
return;
if (targetState == TargetState.PAUSED) {
@ -204,7 +206,7 @@ abstract class WorkerTask implements Runnable { @@ -204,7 +206,7 @@ abstract class WorkerTask implements Runnable {
failed = true;
if (cancelled) {
log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", this, t);
} else if (stopping) {
} else if (isStopping()) {
log.warn("{} After being scheduled for shutdown, task threw an uncaught exception.", this, t);
} else {
log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t);
@ -281,7 +283,7 @@ abstract class WorkerTask implements Runnable { @@ -281,7 +283,7 @@ abstract class WorkerTask implements Runnable {
protected boolean awaitUnpause() throws InterruptedException {
synchronized (this) {
while (targetState == TargetState.PAUSED) {
if (stopping)
if (isStopping())
return false;
this.wait();
}
@ -291,9 +293,21 @@ abstract class WorkerTask implements Runnable { @@ -291,9 +293,21 @@ abstract class WorkerTask implements Runnable {
public void transitionTo(TargetState state) {
synchronized (this) {
// ignore the state change if we are stopping
if (stopping)
// Ignore the state change if we are stopping.
// This has the consequence that, if we ever transition to the STOPPED target state (which
// should never happen since whole point of that state is that it comes with a complete
// shutdown of all the tasks for the connector), we will never be able to transition out of it.
// Since part of transitioning to the STOPPED state is that we shut down the task and all of
// its resources (Kafka clients, SMTs, etc.), this is a reasonable way to do things; otherwise,
// we'd have to re-instantiate all of those resources to be able to resume (or even just pause)
// the task .
if (isStopping()) {
log.debug("{} Ignoring request to transition stopped task {} to state {}", this, id, state);
return;
}
if (targetState == TargetState.STOPPED)
log.warn("{} Received unexpected request to transition task {} to state {}; will shut down in response", this, id, TargetState.STOPPED);
this.targetState = state;
this.notifyAll();

133
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -1094,6 +1094,38 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1094,6 +1094,38 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
);
}
@Override
public void stopConnector(final String connName, final Callback<Void> callback) {
log.trace("Submitting request to transition connector {} to STOPPED state", connName);
addRequest(
() -> {
if (!configState.contains(connName))
throw new NotFoundException("Unknown connector " + connName);
// We only allow the leader to handle this request since it involves writing task configs to the config topic
if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can transition connectors to the STOPPED state.", leaderUrl()), null);
return null;
}
// We write the task configs first since, if we fail between then and writing the target state, the
// cluster is still kept in a healthy state. A RUNNING connector with zero tasks is acceptable (although,
// if the connector is reassigned during the ensuing rebalance, it is likely that it will immediately generate
// a non-empty set of task configs). A STOPPED connector with a non-empty set of tasks is less acceptable
// and likely to confuse users.
writeTaskConfigs(connName, Collections.emptyList());
configBackingStore.putTargetState(connName, TargetState.STOPPED);
// Force a read of the new target state for the connector
refreshConfigSnapshot(workerSyncTimeoutMs);
callback.onCompletion(null, null);
return null;
},
forwardErrorCallback(callback)
);
}
@Override
public void requestTaskReconfiguration(final String connName) {
log.trace("Submitting connector task reconfiguration request {}", connName);
@ -1151,7 +1183,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1151,7 +1183,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
else if (!configState.contains(connName))
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
else {
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, configs));
writeTaskConfigs(connName, configs);
callback.onCompletion(null, null);
}
return null;
@ -1963,6 +1995,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1963,6 +1995,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (!worker.isRunning(connName)) {
log.info("Skipping reconfiguration of connector {} since it is not running", connName);
return;
} else if (configState.targetState(connName) != TargetState.STARTED) {
log.info("Skipping reconfiguration of connector {} since its target state is {}", connName, configState.targetState(connName));
return;
}
Map<String, String> configs = configState.connectorConfig(connName);
@ -1975,52 +2010,66 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1975,52 +2010,66 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
if (taskConfigsChanged(configState, connName, taskProps)) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
cb.onCompletion(null, null);
} else if (restClient == null) {
throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, "
+ "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 "
+ "in dedicated mode, consider enabling inter-worker communication via the "
+ "'dedicated.mode.enable.internal.rest' property.",
leaderUrl()
);
} else {
// We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
// addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
forwardRequestExecutor.submit(() -> {
try {
String leaderUrl = leaderUrl();
if (Utils.isBlank(leaderUrl)) {
cb.onCompletion(new ConnectException("Request to leader to " +
"reconfigure connector tasks failed " +
"because the URL of the leader's REST interface is empty!"), null);
return;
}
String reconfigUrl = namespacedUrl(leaderUrl)
.path("connectors")
.path(connName)
.path("tasks")
.build()
.toString();
log.trace("Forwarding task configurations for connector {} to leader", connName);
restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);
log.trace("Request to leader to reconfigure connector tasks succeeded");
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);
cb.onCompletion(e, null);
}
});
}
}
publishConnectorTaskConfigs(connName, taskProps, cb);
} catch (Throwable t) {
cb.onCompletion(t, null);
}
}
private void publishConnectorTaskConfigs(String connName, List<Map<String, String>> taskProps, Callback<Void> cb) {
if (!taskConfigsChanged(configState, connName, taskProps)) {
return;
}
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
writeTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
} else if (restClient == null) {
throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, "
+ "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 "
+ "in dedicated mode, consider enabling inter-worker communication via the "
+ "'dedicated.mode.enable.internal.rest' property.",
leaderUrl()
);
} else {
// We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
// addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
forwardRequestExecutor.submit(() -> {
try {
String leaderUrl = leaderUrl();
if (Utils.isBlank(leaderUrl)) {
cb.onCompletion(new ConnectException("Request to leader to " +
"reconfigure connector tasks failed " +
"because the URL of the leader's REST interface is empty!"), null);
return;
}
String reconfigUrl = UriBuilder.fromUri(leaderUrl)
.path("connectors")
.path(connName)
.path("tasks")
.build()
.toString();
log.trace("Forwarding task configurations for connector {} to leader", connName);
restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);
cb.onCompletion(e, null);
}
});
}
}
private void writeTaskConfigs(String connName, List<Map<String, String>> taskConfigs) {
if (!taskConfigs.isEmpty()) {
if (configState.targetState(connName) == TargetState.STOPPED)
throw new BadRequestException("Cannot submit non-empty set of task configs for stopped connector " + connName);
}
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, taskConfigs));
}
// Invoked by exactly-once worker source tasks after they have successfully initialized their transactional
// producer to ensure that it is still safe to bring up the task
private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int initialTaskGen) {

13
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java

@ -266,6 +266,19 @@ public class ConnectorsResource implements ConnectResource { @@ -266,6 +266,19 @@ public class ConnectorsResource implements ConnectResource {
return Response.accepted().entity(stateInfo).build();
}
@PUT
@Path("/{connector}/stop")
@Operation(summary = "Stop the specified connector",
description = "This operation is idempotent and has no effects if the connector is already stopped")
public void stopConnector(
@PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
herder.stopConnector(connector, cb);
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/stop", "PUT", headers, null, forward);
}
@PUT
@Path("/{connector}/pause")
@Operation(summary = "Pause the specified connector",

16
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java

@ -238,6 +238,17 @@ public class StandaloneHerder extends AbstractHerder { @@ -238,6 +238,17 @@ public class StandaloneHerder extends AbstractHerder {
}
}
@Override
public synchronized void stopConnector(String connName, Callback<Void> callback) {
try {
removeConnectorTasks(connName);
configBackingStore.putTargetState(connName, TargetState.STOPPED);
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
}
}
@Override
public synchronized void requestTaskReconfiguration(String connName) {
if (!worker.connectorNames().contains(connName)) {
@ -429,7 +440,10 @@ public class StandaloneHerder extends AbstractHerder { @@ -429,7 +440,10 @@ public class StandaloneHerder extends AbstractHerder {
private void updateConnectorTasks(String connName) {
if (!worker.isRunning(connName)) {
log.info("Skipping update of connector {} since it is not running", connName);
log.info("Skipping update of tasks for connector {} since it is not running", connName);
return;
} else if (configState.targetState(connName) != TargetState.STARTED) {
log.info("Skipping update of tasks for connector {} since its target state is {}", connName, configState.targetState(connName));
return;
}

33
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

@ -79,6 +79,8 @@ import java.util.concurrent.TimeUnit; @@ -79,6 +79,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import static org.apache.kafka.connect.runtime.TargetState.PAUSED;
import static org.apache.kafka.connect.runtime.TargetState.STOPPED;
import static org.apache.kafka.connect.util.ConnectUtils.className;
/**
@ -240,6 +242,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -240,6 +242,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
.field("state", Schema.STRING_SCHEMA)
.build();
public static final Schema TARGET_STATE_V1 = SchemaBuilder.struct()
.field("state", Schema.STRING_SCHEMA)
.field("state.v2", Schema.OPTIONAL_STRING_SCHEMA)
.build();
public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
.field("task-count", Schema.INT32_SCHEMA)
.build();
@ -633,9 +639,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -633,9 +639,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
*/
@Override
public void putTargetState(String connector, TargetState state) {
Struct connectTargetState = new Struct(TARGET_STATE_V0);
connectTargetState.put("state", state.name());
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
Struct connectTargetState = new Struct(TARGET_STATE_V1);
// Older workers don't support the STOPPED state; fall back on PAUSED
connectTargetState.put("state", state == STOPPED ? PAUSED.name() : state.name());
connectTargetState.put("state.v2", state.name());
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V1, connectTargetState);
log.debug("Writing target state {} for connector {}", state, connector);
try {
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@ -928,11 +936,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -928,11 +936,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
return;
}
@SuppressWarnings("unchecked")
Object targetState = ((Map<String, Object>) value.value()).get("state");
if (!(targetState instanceof String)) {
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
Map<String, Object> valueMap = (Map<String, Object>) value.value();
Object targetState = valueMap.get("state.v2");
if (targetState != null && !(targetState instanceof String)) {
log.error("Invalid data for target state for connector '{}': 'state.v2' field should be a String but is {}",
connectorName, className(targetState));
return;
// We don't return here; it's still possible that there's a value we can use in the older state field
targetState = null;
}
if (targetState == null) {
// This record may have been written by an older worker; fall back on the older state field
targetState = valueMap.get("state");
if (!(targetState instanceof String)) {
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
connectorName, className(targetState));
return;
}
}
try {

175
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java

@ -325,6 +325,181 @@ public class ConnectWorkerIntegrationTest { @@ -325,6 +325,181 @@ public class ConnectWorkerIntegrationTest {
assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES));
}
/**
* Verify that the target state (started, paused, stopped) of a connector can be updated, with
* an emphasis on ensuring that the transitions between each state are correct.
* <p>
* The transitions we need to cover are:
* <ol>
* <li>RUNNING -> PAUSED</li>
* <li>RUNNING -> STOPPED</li>
* <li>PAUSED -> RUNNING</li>
* <li>PAUSED -> STOPPED</li>
* <li>STOPPED -> RUNNING</li>
* <li>STOPPED -> PAUSED</li>
* </ol>
* With some reordering, we can perform each transition just once:
* <ul>
* <li>Start with RUNNING</li>
* <li>Transition to STOPPED (2)</li>
* <li>Transition to RUNNING (5)</li>
* <li>Transition to PAUSED (1)</li>
* <li>Transition to STOPPED (4)</li>
* <li>Transition to PAUSED (6)</li>
* <li>Transition to RUNNING (3)</li>
* </ul>
*/
@Test
public void testPauseStopResume() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
// Want to make sure to use multiple tasks
final int numTasks = 4;
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
// Start with RUNNING
connect.configureConnector(CONNECTOR_NAME, props);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"Connector tasks did not start in time"
);
// Transition to STOPPED
connect.stopConnector(CONNECTOR_NAME);
// Issue a second request to ensure that this operation is idempotent
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
// Transition to RUNNING
connect.resumeConnector(CONNECTOR_NAME);
// Issue a second request to ensure that this operation is idempotent
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"Connector tasks did not resume in time"
);
// Transition to PAUSED
connect.pauseConnector(CONNECTOR_NAME);
// Issue a second request to ensure that this operation is idempotent
connect.pauseConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
CONNECTOR_NAME,
numTasks,
"Connector did not pause in time"
);
// Transition to STOPPED
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
// Transition to PAUSED
connect.pauseConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
CONNECTOR_NAME,
0,
"Connector did not pause in time"
);
// Transition to RUNNING
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"Connector tasks did not resume in time"
);
// Delete the connector
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndTasksAreNotRunning(
CONNECTOR_NAME,
"Connector tasks were not destroyed in time"
);
}
/**
* Test out the {@code STOPPED} state introduced in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED">KIP-875</a>,
* with an emphasis on correctly handling errors thrown from the connector.
*/
@Test
public void testStoppedState() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
// Fail the connector on startup
props.put("connector.start.inject.error", "true");
// Start the connector (should fail immediately and generate no tasks)
connect.configureConnector(CONNECTOR_NAME, props);
connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
CONNECTOR_NAME,
0,
"Connector should have failed and not generated any tasks"
);
// Stopping a failed connector updates its state to STOPPED in the REST API
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
// Can resume a connector after its Connector has failed before shutdown after receiving a stop request
props.remove("connector.start.inject.error");
connect.configureConnector(CONNECTOR_NAME, props);
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or tasks did not start running healthily in time"
);
// Fail the connector on shutdown
props.put("connector.stop.inject.error", "true");
// Stopping a connector that fails during shutdown after receiving a stop request updates its state to STOPPED in the REST API
connect.configureConnector(CONNECTOR_NAME, props);
connect.stopConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector did not stop in time"
);
// Can resume a connector after its Connector has failed during shutdown after receiving a stop request
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or tasks did not start running healthily in time"
);
// Can delete a stopped connector
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndTasksAreNotRunning(
CONNECTOR_NAME,
"Connector and all of its tasks should no longer be running"
);
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();

4
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java

@ -150,7 +150,7 @@ public class ConnectorTopicsIntegrationTest { @@ -150,7 +150,7 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(BAR_CONNECTOR);
connect.assertions().assertConnectorAndTasksAreStopped(BAR_CONNECTOR,
connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
"Connector tasks did not stop in time.");
connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(),
@ -205,7 +205,7 @@ public class ConnectorTopicsIntegrationTest { @@ -205,7 +205,7 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(FOO_CONNECTOR);
connect.assertions().assertConnectorAndTasksAreStopped(FOO_CONNECTOR,
connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
"Connector tasks did not stop in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),

4
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java

@ -178,7 +178,7 @@ public class ErrorHandlingIntegrationTest { @@ -178,7 +178,7 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
"Connector tasks did not stop in time.");
}
@ -247,7 +247,7 @@ public class ErrorHandlingIntegrationTest { @@ -247,7 +247,7 @@ public class ErrorHandlingIntegrationTest {
ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
"Connector tasks did not stop in time.");
}

3
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java

@ -104,6 +104,9 @@ public class MonitorableSourceConnector extends SampleSourceConnector { @@ -104,6 +104,9 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
public void stop() {
log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName);
connectorHandle.recordConnectorStop();
if (Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", "false"))) {
throw new RuntimeException("Injecting errors during connector stop");
}
}
@Override

2
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java

@ -206,7 +206,7 @@ public class RebalanceSourceConnectorsIntegrationTest { @@ -206,7 +206,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
// delete connector
connect.deleteConnector(CONNECTOR_NAME + 3);
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
"Connector tasks did not stop in time.");
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,

170
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java

@ -103,7 +103,7 @@ public class WorkerConnectorTest { @@ -103,7 +103,7 @@ public class WorkerConnectorTest {
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
@ -127,7 +127,7 @@ public class WorkerConnectorTest { @@ -127,7 +127,7 @@ public class WorkerConnectorTest {
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
@ -153,7 +153,7 @@ public class WorkerConnectorTest { @@ -153,7 +153,7 @@ public class WorkerConnectorTest {
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@ -181,7 +181,7 @@ public class WorkerConnectorTest { @@ -181,7 +181,7 @@ public class WorkerConnectorTest {
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@ -195,6 +195,37 @@ public class WorkerConnectorTest { @@ -195,6 +195,37 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupAndStop() {
connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
workerConnector.initialize();
assertInitializedSinkMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onStop(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testOnResume() {
connector = sourceConnector;
@ -213,7 +244,7 @@ public class WorkerConnectorTest { @@ -213,7 +244,7 @@ public class WorkerConnectorTest {
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onPause(CONNECTOR);
@ -241,7 +272,7 @@ public class WorkerConnectorTest { @@ -241,7 +272,7 @@ public class WorkerConnectorTest {
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
// connector never gets started
@ -252,6 +283,31 @@ public class WorkerConnectorTest { @@ -252,6 +283,31 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupStopped() {
connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
workerConnector.initialize();
assertInitializedSinkMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
// connector never gets started
verify(listener).onStop(CONNECTOR);
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupFailure() {
RuntimeException exception = new RuntimeException();
@ -269,7 +325,7 @@ public class WorkerConnectorTest { @@ -269,7 +325,7 @@ public class WorkerConnectorTest {
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@ -280,6 +336,49 @@ public class WorkerConnectorTest { @@ -280,6 +336,49 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStopFailure() {
RuntimeException exception = new RuntimeException();
connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
// Fail during the first call to stop, then succeed for the next attempt
doThrow(exception).doNothing().when(connector).stop();
Callback<TargetState> onFirstStateChange = mockCallback();
Callback<TargetState> onSecondStateChange = mockCallback();
Callback<TargetState> onThirdStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onFirstStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onSecondStateChange);
assertStoppedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onThirdStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector, times(2)).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onResume(CONNECTOR);
verify(listener).onStop(CONNECTOR);
verify(onFirstStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onFirstStateChange);
// We swallow failures when transitioning to the STOPPED state
verify(onSecondStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onSecondStateChange);
verify(onThirdStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onThirdStateChange);
verifyShutdown(2, true, true);
verifyNoMoreInteractions(listener);
}
@Test
public void testShutdownFailure() {
RuntimeException exception = new RuntimeException();
@ -327,7 +426,7 @@ public class WorkerConnectorTest { @@ -327,7 +426,7 @@ public class WorkerConnectorTest {
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@ -356,7 +455,7 @@ public class WorkerConnectorTest { @@ -356,7 +455,7 @@ public class WorkerConnectorTest {
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertStoppedMetric(workerConnector);
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@ -370,6 +469,38 @@ public class WorkerConnectorTest { @@ -370,6 +469,38 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testTransitionStoppedToStopped() {
connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onStop(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testFailConnectorThatIsNeitherSourceNorSink() {
connector = mock(Connector.class);
@ -389,13 +520,23 @@ public class WorkerConnectorTest { @@ -389,13 +520,23 @@ public class WorkerConnectorTest {
protected void assertFailedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertTrue(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertStoppedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isPaused());
assertTrue(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertPausedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertTrue(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
@ -403,13 +544,15 @@ public class WorkerConnectorTest { @@ -403,13 +544,15 @@ public class WorkerConnectorTest {
protected void assertRunningMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertTrue(workerConnector.metrics().isRunning());
}
protected void assertStoppedMetric(WorkerConnector workerConnector) {
protected void assertDestroyedMetric(WorkerConnector workerConnector) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
@ -425,6 +568,7 @@ public class WorkerConnectorTest { @@ -425,6 +568,7 @@ public class WorkerConnectorTest {
protected void assertInitializedMetric(WorkerConnector workerConnector, String expectedType) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
MetricGroup metricGroup = workerConnector.metrics().metricGroup();
@ -457,6 +601,10 @@ public class WorkerConnectorTest { @@ -457,6 +601,10 @@ public class WorkerConnectorTest {
}
private void verifyShutdown(boolean clean, boolean started) {
verifyShutdown(1, clean, started);
}
private void verifyShutdown(int connectorStops, boolean clean, boolean started) {
verify(ctx).close();
if (connector instanceof SourceConnector) {
verify(offsetStorageReader).close();
@ -466,7 +614,7 @@ public class WorkerConnectorTest { @@ -466,7 +614,7 @@ public class WorkerConnectorTest {
verify(listener).onShutdown(CONNECTOR);
}
if (started) {
verify(connector).stop();
verify(connector, times(connectorStops)).stop();
}
}

203
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

@ -217,6 +217,17 @@ public class DistributedHerderTest { @@ -217,6 +217,17 @@ public class DistributedHerderTest {
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STOPPED),
Collections.emptyMap(), // Stopped connectors should have an empty set of task configs
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(
1,
null,
@ -2161,6 +2172,65 @@ public class DistributedHerderTest { @@ -2161,6 +2172,65 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
@Test
public void testConnectorStopped() throws Exception {
// ensure that target state changes are propagated to the worker
EasyMock.expect(member.memberId()).andStubReturn("member");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
// join
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
expectConfigRefreshAndSnapshot(SNAPSHOT);
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
Capture<Callback<TargetState>> onStart = newCapture();
worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onStart));
PowerMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
});
member.wakeup();
PowerMock.expectLastCall();
expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// handle the state change
member.wakeup();
PowerMock.expectLastCall();
member.ensureActive();
PowerMock.expectLastCall();
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1);
Capture<Callback<TargetState>> onStop = newCapture();
worker.setTargetState(EasyMock.eq(CONN1), EasyMock.eq(TargetState.STOPPED), capture(onStop));
PowerMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STOPPED);
return null;
});
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// These will occur just before/during the third tick
member.ensureActive();
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
herder.tick(); // join
configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to stopped
herder.tick(); // worker should apply the state change
herder.tick();
PowerMock.verifyAll();
}
@Test
public void testUnknownConnectorPaused() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("member");
@ -2197,6 +2267,139 @@ public class DistributedHerderTest { @@ -2197,6 +2267,139 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
@Test
public void testStopConnector() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
// join as leader
expectRebalance(1, Collections.emptyList(), singletonList(TASK0), true);
expectConfigRefreshAndSnapshot(SNAPSHOT);
worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// handle stop request
member.wakeup();
PowerMock.expectLastCall();
member.ensureActive();
PowerMock.expectLastCall();
expectConfigRefreshAndSnapshot(SNAPSHOT);
configBackingStore.putTaskConfigs(CONN1, Collections.emptyList());
PowerMock.expectLastCall();
configBackingStore.putTargetState(CONN1, TargetState.STOPPED);
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
FutureCallback<Void> cb = new FutureCallback<>();
herder.tick(); // join
herder.stopConnector(CONN1, cb); // external request
herder.tick(); // continue
assertTrue("Callback should already have been invoked by herder", cb.isDone());
cb.get(0, TimeUnit.MILLISECONDS);
PowerMock.verifyAll();
}
@Test
public void testStopConnectorNotLeader() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("member");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
// join as leader
expectRebalance(1, Collections.emptyList(), singletonList(TASK0));
expectConfigRefreshAndSnapshot(SNAPSHOT);
worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// handle stop request
member.wakeup();
PowerMock.expectLastCall();
member.ensureActive();
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
FutureCallback<Void> cb = new FutureCallback<>();
herder.tick(); // join
herder.stopConnector(CONN1, cb); // external request
herder.tick(); // continue
assertTrue("Callback should already have been invoked by herder", cb.isDone());
ExecutionException e = assertThrows(
"Should not be able to handle request to stop connector when not leader",
ExecutionException.class,
() -> cb.get(0, TimeUnit.SECONDS)
);
assertTrue(e.getCause() instanceof NotLeaderException);
PowerMock.verifyAll();
}
@Test
public void testStopConnectorFailToWriteTaskConfigs() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
// join as leader
expectRebalance(1, Collections.emptyList(), singletonList(TASK0), true);
expectConfigRefreshAndSnapshot(SNAPSHOT);
worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
ConnectException taskConfigsWriteException = new ConnectException("Could not write task configs to config topic");
// handle stop request
member.wakeup();
PowerMock.expectLastCall();
member.ensureActive();
PowerMock.expectLastCall();
configBackingStore.putTaskConfigs(CONN1, Collections.emptyList());
// We do not expect configBackingStore::putTargetState to be invoked, which
// is intentional since that call should only take place if we are first able to
// successfully write the empty list of task configs
PowerMock.expectLastCall().andThrow(taskConfigsWriteException);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
FutureCallback<Void> cb = new FutureCallback<>();
herder.tick(); // join
herder.stopConnector(CONN1, cb); // external request
herder.tick(); // continue
assertTrue("Callback should already have been invoked by herder", cb.isDone());
ExecutionException e = assertThrows(
"Should not be able to handle request to stop connector when not leader",
ExecutionException.class,
() -> cb.get(0, TimeUnit.SECONDS)
);
assertEquals(e.getCause(), taskConfigsWriteException);
PowerMock.verifyAll();
}
@Test
public void testConnectorPausedRunningTaskOnly() throws Exception {
// even if we don't own the connector, we should still propagate target state

55
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java

@ -86,6 +86,7 @@ import static java.util.Collections.singletonMap; @@ -86,6 +86,7 @@ import static java.util.Collections.singletonMap;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -733,6 +734,7 @@ public class StandaloneHerderTest { @@ -733,6 +734,7 @@ public class StandaloneHerderTest {
expectStop();
statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
EasyMock.expectLastCall();
statusBackingStore.stop();
EasyMock.expectLastCall();
@ -937,6 +939,50 @@ public class StandaloneHerderTest { @@ -937,6 +939,50 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
@Test
public void testTargetStates() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connectorConfig);
// We pause, then stop, the connector
expectTargetState(CONNECTOR_NAME, TargetState.PAUSED);
expectTargetState(CONNECTOR_NAME, TargetState.STOPPED);
// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
expectStop();
statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
EasyMock.expectLastCall();
statusBackingStore.stop();
EasyMock.expectLastCall();
worker.stop();
EasyMock.expectLastCall();
PowerMock.replayAll();
FutureCallback<Void> stopCallback = new FutureCallback<>();
FutureCallback<List<TaskInfo>> taskConfigsCallback = new FutureCallback<>();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
herder.pauseConnector(CONNECTOR_NAME);
herder.stopConnector(CONNECTOR_NAME, stopCallback);
stopCallback.get(10L, TimeUnit.SECONDS);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback);
assertEquals(Collections.emptyList(), taskConfigsCallback.get(1, TimeUnit.SECONDS));
herder.stop();
assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
PowerMock.verifyAll();
}
private void expectAdd(SourceSink sourceSink) {
Map<String, String> connectorProps = connectorConfig(sourceSink);
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
@ -996,6 +1042,15 @@ public class StandaloneHerderTest { @@ -996,6 +1042,15 @@ public class StandaloneHerderTest {
PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK).anyTimes();
}
private void expectTargetState(String connector, TargetState state) {
Capture<Callback<TargetState>> stateChangeCallback = Capture.newInstance();
worker.setTargetState(eq(connector), eq(state), capture(stateChangeCallback));
EasyMock.expectLastCall().andAnswer(() -> {
stateChangeCallback.getValue().onCompletion(null, state);
return null;
});
}
private ConnectorInfo createdInfo(SourceSink sourceSink) {
return new ConnectorInfo(CONNECTOR_NAME, connectorConfig(sourceSink),
Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),

56
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java

@ -61,6 +61,7 @@ import java.util.ArrayList; @@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@ -145,8 +146,15 @@ public class KafkaConfigBackingStoreTest { @@ -145,8 +146,15 @@ public class KafkaConfigBackingStoreTest {
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0)
.put("state", "PAUSED");
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "PAUSED");
private static final Struct TARGET_STATE_STOPPED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "STOPPED");
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@ -460,7 +468,7 @@ public class KafkaConfigBackingStoreTest { @@ -460,7 +468,7 @@ public class KafkaConfigBackingStoreTest {
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
// In the meantime, write a target state (which doesn't require write privileges)
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
@ -811,14 +819,18 @@ public class KafkaConfigBackingStoreTest { @@ -811,14 +819,18 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED);
// A worker running an older version wrote this target state; make sure we can handle it correctly
deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY);
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
logOffset = 6;
expectStart(existingRecords, deserialized);
@ -834,9 +846,10 @@ public class KafkaConfigBackingStoreTest { @@ -834,9 +846,10 @@ public class KafkaConfigBackingStoreTest {
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();
@ -857,18 +870,27 @@ public class KafkaConfigBackingStoreTest { @@ -857,18 +870,27 @@ public class KafkaConfigBackingStoreTest {
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
LinkedHashMap<byte[], Struct> deserializedOnStartup = new LinkedHashMap<>();
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
expectStart(existingRecords, deserializedOnStartup);
LinkedHashMap<String, byte[]> serializedAfterStartup = new LinkedHashMap<>();
serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_PAUSED);
Map<String, Struct> deserializedAfterStartup = new HashMap<>();
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED);
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED);
expectRead(serializedAfterStartup, deserializedAfterStartup);
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
expectPartitionCount(1);
@ -879,11 +901,17 @@ public class KafkaConfigBackingStoreTest { @@ -879,11 +901,17 @@ public class KafkaConfigBackingStoreTest {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector with initial state paused
// Should see a single connector with initial state started
ClusterConfigState configState = configStorage.snapshot();
assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configStorage.connectorTargetStates.keySet());
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
// Should see two connectors now, one paused and one stopped
configStorage.refresh(0, TimeUnit.SECONDS);
configState = configStorage.snapshot();
assertEquals(new HashSet<>(CONNECTOR_IDS), configStorage.connectorTargetStates.keySet());
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();

65
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java

@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@ -391,6 +392,22 @@ public class EmbeddedConnectCluster { @@ -391,6 +392,22 @@ public class EmbeddedConnectCluster {
}
}
/**
* Stop an existing connector.
*
* @param connName name of the connector to be paused
* @throws ConnectRestException if the REST API returns error status
* @throws ConnectException for any other error.
*/
public void stopConnector(String connName) {
String url = endpointForResource(String.format("connectors/%s/stop", connName));
Response response = requestPut(url, "");
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
throw new ConnectRestException(response.getStatus(),
"Could not execute PUT request. Error response: " + responseToString(response));
}
}
/**
* Pause an existing connector.
*
@ -554,6 +571,54 @@ public class EmbeddedConnectCluster { @@ -554,6 +571,54 @@ public class EmbeddedConnectCluster {
"Could not read connector state. Error response: " + responseToString(response));
}
/**
* Get the info of a connector running in this cluster (retrieved via the <code>GET /connectors/{connector}</code> endpoint).
* @param connectorName name of the connector
* @return an instance of {@link ConnectorInfo} populated with state information of the connector and its tasks.
*/
public ConnectorInfo connectorInfo(String connectorName) {
ObjectMapper mapper = new ObjectMapper();
String url = endpointForResource(String.format("connectors/%s", connectorName));
Response response = requestGet(url);
try {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return mapper.readValue(responseToString(response), ConnectorInfo.class);
}
} catch (IOException e) {
log.error("Could not read connector info from response: {}",
responseToString(response), e);
throw new ConnectException("Could not not parse connector info", e);
}
throw new ConnectRestException(response.getStatus(),
"Could not read connector info. Error response: " + responseToString(response));
}
/**
* Get the task configs of a connector running in this cluster.
* @param connectorName name of the connector
* @return a map from task ID (connector name + "-" + task number) to task config
*/
public Map<String, Map<String, String>> taskConfigs(String connectorName) {
ObjectMapper mapper = new ObjectMapper();
String url = endpointForResource(String.format("connectors/%s/tasks-config", connectorName));
Response response = requestGet(url);
try {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
// We use String instead of ConnectorTaskId as the key here since the latter can't be automatically
// deserialized by Jackson when used as a JSON object key (i.e., when it's serialized as a JSON string)
return mapper.readValue(responseToString(response), new TypeReference<Map<String, Map<String, String>>>() { });
}
} catch (IOException e) {
log.error("Could not read task configs from response: {}",
responseToString(response), e);
throw new ConnectException("Could not not parse task configs", e);
}
throw new ConnectRestException(response.getStatus(),
"Could not read task configs. Error response: " + responseToString(response));
}
/**
* Reset the set of active topics of a connector running in this cluster.
*

67
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java

@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
@ -37,6 +38,7 @@ import java.util.function.BiFunction; @@ -37,6 +38,7 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
/**
* A set of common assertions that can be applied to a Connect cluster during integration testing
@ -301,7 +303,8 @@ public class EmbeddedConnectClusterAssertions { @@ -301,7 +303,8 @@ public class EmbeddedConnectClusterAssertions {
}
/**
* Assert that a connector is running with at least the given number of tasks all in running state
* Assert that a connector is running, that it has a specific number of tasks, and that all of
* its tasks are in the RUNNING state.
*
* @param connectorName the connector name
* @param numTasks the number of tasks
@ -326,6 +329,33 @@ public class EmbeddedConnectClusterAssertions { @@ -326,6 +329,33 @@ public class EmbeddedConnectClusterAssertions {
}
}
/**
* Assert that a connector is paused, that it has a specific number of tasks, and that all of
* its tasks are in the PAUSED state.
*
* @param connectorName the connector name
* @param numTasks the number of tasks
* @param detailMessage the assertion message
* @throws InterruptedException
*/
public void assertConnectorAndExactlyNumTasksArePaused(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
connectorName,
AbstractStatus.State.PAUSED,
numTasks,
AbstractStatus.State.PAUSED,
Integer::equals
).orElse(false),
CONNECTOR_SHUTDOWN_DURATION_MS,
"The connector or exactly " + numTasks + " tasks are not paused.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
}
/**
* Assert that a connector is running, that it has a specific number of tasks, and that all of
* its tasks are in the FAILED state.
@ -415,11 +445,11 @@ public class EmbeddedConnectClusterAssertions { @@ -415,11 +445,11 @@ public class EmbeddedConnectClusterAssertions {
* @param detailMessage the assertion message
* @throws InterruptedException
*/
public void assertConnectorAndTasksAreStopped(String connectorName, String detailMessage)
public void assertConnectorAndTasksAreNotRunning(String connectorName, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorAndTasksAreStopped(connectorName),
() -> checkConnectorAndTasksAreNotRunning(connectorName),
CONNECTOR_SETUP_DURATION_MS,
"At least the connector or one of its tasks is still running");
} catch (AssertionError e) {
@ -433,7 +463,7 @@ public class EmbeddedConnectClusterAssertions { @@ -433,7 +463,7 @@ public class EmbeddedConnectClusterAssertions {
* @param connectorName the connector
* @return true if the connector and all the tasks are not in RUNNING state; false otherwise
*/
protected boolean checkConnectorAndTasksAreStopped(String connectorName) {
protected boolean checkConnectorAndTasksAreNotRunning(String connectorName) {
ConnectorStateInfo info;
try {
info = connect.connectorStatus(connectorName);
@ -450,6 +480,35 @@ public class EmbeddedConnectClusterAssertions { @@ -450,6 +480,35 @@ public class EmbeddedConnectClusterAssertions {
&& info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
}
/**
* Assert that a connector is in the stopped state and has no tasks.
*
* @param connectorName the connector name
* @param detailMessage the assertion message
* @throws InterruptedException
*/
public void assertConnectorIsStopped(String connectorName, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
() -> checkConnectorState(
connectorName,
AbstractStatus.State.STOPPED,
0,
null,
Integer::equals
).orElse(false),
CONNECTOR_SHUTDOWN_DURATION_MS,
"At least the connector or one of its tasks is still running");
// If the connector is truly stopped, we should also see an empty set of tasks and task configs
assertEquals(Collections.emptyList(), connect.connectorInfo(connectorName).tasks());
assertEquals(Collections.emptyMap(), connect.taskConfigs(connectorName));
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
}
/**
* Check whether the given connector state matches the current state of the connector and
* whether it has at least the given number of tasks, with all the tasks matching the given

1
gradle/spotbugs-exclude.xml

@ -303,7 +303,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read @@ -303,7 +303,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
<Or>
<Method name="doStart"/>
<Method name="pause"/>
</Or>
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>

Loading…
Cancel
Save