Browse Source

KAFKA-8179: do not suspend standby tasks during rebalance (#7321)

Some work needs to be done in Streams before we can incorporate cooperative rebalancing. 

This PR lays the groundwork for it by doing some refactoring, including a behavioral change that affects eager ("normal") rebalancing as well: will no longer suspend standbys in onPartitionsRevoked, instead we just close any that were reassigned in onPartitionsAssigned

Reviewers: Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
pull/7365/merge
A. Sophie Blee-Goldman 5 years ago committed by Guozhang Wang
parent
commit
74f8ae1303
  1. 16
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
  2. 74
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  3. 15
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  4. 43
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  5. 42
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
  6. 293
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
  7. 137
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
  8. 15
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
  9. 24
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  10. 214
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  11. 39
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
  12. 169
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
  13. 297
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  14. 24
      streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
  15. 4
      streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
  16. 36
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
  17. 10
      streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
  18. 66
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  19. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
  20. 114
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
  21. 7
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

16
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java

@ -67,7 +67,9 @@ import org.apache.kafka.common.TopicPartition; @@ -67,7 +67,9 @@ import org.apache.kafka.common.TopicPartition;
* During a rebalance event, the {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} function will always be triggered exactly once when
* the rebalance completes. That is, even if there is no newly assigned partitions for a consumer member, its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}
* will still be triggered with an empty collection of partitions. As a result this function can be used also to notify when a rebalance event has happened.
* On the other hand, {@link #onPartitionsRevoked(Collection)} and {@link #onPartitionsLost(Collection)}
* With eager rebalancing, {@link #onPartitionsRevoked(Collection)} will always be called at the start of a rebalance. On the other hand, {@link #onPartitionsLost(Collection)}
* will only be called when there were non-empty partitions that were lost.
* With cooperative rebalancing, {@link #onPartitionsRevoked(Collection)} and {@link #onPartitionsLost(Collection)}
* will only be triggered when there are non-empty partitions revoked or lost from this consumer member during a rebalance event.
* <p>
* It is possible
@ -117,16 +119,16 @@ import org.apache.kafka.common.TopicPartition; @@ -117,16 +119,16 @@ import org.apache.kafka.common.TopicPartition;
public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of offset commits to a customized store on the start
* of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
* stops fetching data. It can also be called when consumer is being closed ({@link KafkaConsumer#close(Duration)})
* A callback method the user can implement to provide handling of offset commits to a customized store.
* This method will be called during a rebalance operation when the consumer has to give up some partitions.
* It can also be called when consumer is being closed ({@link KafkaConsumer#close(Duration)})
* or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
* It is recommended that offsets should be committed in this callback to either Kafka or a
* custom offset store to prevent duplicate data.
* <p>
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
* <p>
* <b>NOTE:</b> This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}.
* In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.
* In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the set is non-empty.
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}.
* <p>
* It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}

74
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -351,62 +351,50 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -351,62 +351,50 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return;
}
// The leader may have assigned partitions which match our subscription pattern, but which
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);
// give the assignor a chance to update internal state based on the received assignment
ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, metadata);
// reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
// execute the user's callback after rebalance
final AtomicReference<Exception> firstException = new AtomicReference<>(null);
Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
switch (protocol) {
case EAGER:
// assign partitions that are not yet owned
subscriptions.assignFromSubscribed(assignedPartitions);
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
break;
// Invoke user's revocation callback before changing assignment or updating state
if (protocol == RebalanceProtocol.COOPERATIVE) {
Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
case COOPERATIVE:
Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
"newly added partitions: {}, revoking partitions: {}",
Utils.join(assignedPartitions, ", "),
Utils.join(ownedPartitions, ", "),
Utils.join(addedPartitions, ", "),
Utils.join(revokedPartitions, ", "));
log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
"newly added partitions: {}, revoking partitions: {}",
Utils.join(assignedPartitions, ", "),
Utils.join(ownedPartitions, ", "),
Utils.join(addedPartitions, ", "),
Utils.join(revokedPartitions, ", "));
if (!revokedPartitions.isEmpty()) {
// revoke partitions that was previously owned but no longer assigned;
// note that we should only change the assignment AFTER we've triggered
// the revoke callback
if (!revokedPartitions.isEmpty()) {
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
}
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
subscriptions.assignFromSubscribed(assignedPartitions);
// if revoked any partitions, need to re-join the group afterwards
requestRejoin();
}
}
// add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
// The leader may have assigned partitions which match our subscription pattern, but which
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);
// if revoked any partitions, need to re-join the group afterwards
if (!revokedPartitions.isEmpty()) {
requestRejoin();
}
// give the assignor a chance to update internal state based on the received assignment
ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, metadata);
break;
}
// reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
subscriptions.assignFromSubscribed(assignedPartitions);
// add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
if (firstException.get() != null)
throw new KafkaException("User rebalance callback throws an error", firstException.get());
@ -566,7 +554,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -566,7 +554,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// when these topics gets updated from metadata refresh.
//
// TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
// we may need to modify the PartitionAssignor API to better support this case.
// we may need to modify the ConsumerPartitionAssignor API to better support this case.
Set<String> assignedTopics = new HashSet<>();
for (Assignment assigned : assignments.values()) {
for (TopicPartition tp : assigned.partitions())

15
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.HostInfo;
@ -188,17 +189,19 @@ public class KafkaStreams implements AutoCloseable { @@ -188,17 +189,19 @@ public class KafkaStreams implements AutoCloseable {
*
* </pre>
* Note the following:
* - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED state
* - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state
* - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
* - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
* - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
* the instance will be in the ERROR state. The user will need to close it.
*/
// TODO: the current transitions from other states directly to RUNNING is due to
// the fact that onPartitionsRevoked may not be triggered. we need to refactor the
// state diagram more thoroughly after we refactor StreamsPartitionAssignor to support COOPERATIVE
public enum State {
CREATED(1, 2, 3), REBALANCING(2, 3, 5), RUNNING(1, 2, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
CREATED(1, 3), // 0
REBALANCING(2, 3, 5), // 1
RUNNING(1, 2, 3, 5), // 2
PENDING_SHUTDOWN(4), // 3
NOT_RUNNING, // 4
ERROR(3); // 5
private final Set<Integer> validTransitions = new HashSet<>();
@ -462,7 +465,7 @@ public class KafkaStreams implements AutoCloseable { @@ -462,7 +465,7 @@ public class KafkaStreams implements AutoCloseable {
final StreamThread.State newState = (StreamThread.State) abstractNewState;
threadState.put(thread.getId(), newState);
if (newState == StreamThread.State.PARTITIONS_REVOKED) {
if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) {
setState(State.REBALANCING);
} else if (newState == StreamThread.State.RUNNING) {
maybeSetRunning();

43
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -251,6 +251,30 @@ public class StreamsConfig extends AbstractConfig { @@ -251,6 +251,30 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_11 = "1.1";
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_20 = "2.0";
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_21 = "2.1";
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_22 = "2.2";
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_23 = "2.3";
/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
*/
@ -474,9 +498,10 @@ public class StreamsConfig extends AbstractConfig { @@ -474,9 +498,10 @@ public class StreamsConfig extends AbstractConfig {
/** {@code upgrade.from} */
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
private static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " +
"When upgrading from 1.2 to a newer version it is not required to specify this config." +
"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version).";
private static final String UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. " +
"This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. " +
"When upgrading from 2.4 to a newer version it is not required to specify this config. " +
"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" + UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from the corresponding old version).";
/** {@code windowstore.changelog.additional.retention.ms} */
@SuppressWarnings("WeakerAccess")
@ -709,7 +734,17 @@ public class StreamsConfig extends AbstractConfig { @@ -709,7 +734,17 @@ public class StreamsConfig extends AbstractConfig {
.define(UPGRADE_FROM_CONFIG,
ConfigDef.Type.STRING,
null,
in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11),
in(null,
UPGRADE_FROM_0100,
UPGRADE_FROM_0101,
UPGRADE_FROM_0102,
UPGRADE_FROM_0110,
UPGRADE_FROM_10,
UPGRADE_FROM_11,
UPGRADE_FROM_20,
UPGRADE_FROM_21,
UPGRADE_FROM_22,
UPGRADE_FROM_23),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,

42
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java

@ -16,7 +16,13 @@ @@ -16,7 +16,13 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
@ -34,4 +40,40 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> { @@ -34,4 +40,40 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
running.forEach((id, task) -> task.allowUpdateOfOffsetLimit());
return committed;
}
/**
* Closes standby tasks that were reassigned elsewhere after a rebalance.
*
* @param revokedTasks the tasks which are no longer owned
* @return the changelogs of all standby tasks that were reassigned
*/
List<TopicPartition> closeRevokedStandbyTasks(final Map<TaskId, Set<TopicPartition>> revokedTasks) {
log.debug("Closing revoked standby tasks {}", revokedTasks);
final List<TopicPartition> revokedChangelogs = new ArrayList<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : revokedTasks.entrySet()) {
final TaskId taskId = entry.getKey();
final Task task;
if (running.containsKey(taskId)) {
task = running.get(taskId);
} else if (created.containsKey(taskId)) {
task = created.get(taskId);
} else {
log.error("Could not find the standby task {} while closing it", taskId);
continue;
}
try {
task.close(true, false);
} catch (final RuntimeException e) {
log.error("Closing the {} {} failed due to the following error:", taskTypeName, task.id(), e);
} finally {
running.remove(taskId);
revokedChangelogs.addAll(task.changelogPartitions());
}
}
return revokedChangelogs;
}
}

293
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
@ -32,9 +33,11 @@ import java.util.Map; @@ -32,9 +33,11 @@ import java.util.Map;
import java.util.Set;
class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
private final Map<TaskId, StreamTask> suspended = new HashMap<>();
private final Map<TaskId, StreamTask> restoring = new HashMap<>();
private final Set<TopicPartition> restoredPartitions = new HashSet<>();
private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<>();
private final Set<TaskId> prevActiveTasks = new HashSet<>();
AssignedStreamsTasks(final LogContext logContext) {
super(logContext, "stream task");
@ -49,6 +52,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -49,6 +52,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
List<StreamTask> allTasks() {
final List<StreamTask> tasks = super.allTasks();
tasks.addAll(restoring.values());
tasks.addAll(suspended.values());
return tasks;
}
@ -56,39 +60,285 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -56,39 +60,285 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = super.allAssignedTaskIds();
taskIds.addAll(restoring.keySet());
taskIds.addAll(suspended.keySet());
return taskIds;
}
@Override
boolean allTasksRunning() {
return super.allTasksRunning() && restoring.isEmpty();
// If we have some tasks that are suspended but others are running, count this as all tasks are running
// since they will be closed soon anyway (eg if partitions are revoked at beginning of cooperative rebalance)
return super.allTasksRunning() && restoring.isEmpty() && (suspended.isEmpty() || !running.isEmpty());
}
RuntimeException closeAllRestoringTasks() {
RuntimeException exception = null;
@Override
void closeTask(final StreamTask task, final boolean clean) {
if (suspended.containsKey(task.id())) {
task.closeSuspended(clean, false, null);
} else {
task.close(clean, false);
}
}
Set<TaskId> suspendedTaskIds() {
return suspended.keySet();
}
Set<TaskId> previousRunningTaskIds() {
return prevActiveTasks;
}
RuntimeException suspendOrCloseTasks(final Set<TaskId> revokedTasks,
final List<TopicPartition> revokedTaskChangelogs) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
final Set<TaskId> revokedRunningTasks = new HashSet<>();
final Set<TaskId> revokedNonRunningTasks = new HashSet<>();
final Set<TaskId> revokedRestoringTasks = new HashSet<>();
// This set is used only for eager rebalancing, so we can just clear it and add any/all tasks that were running
prevActiveTasks.clear();
prevActiveTasks.addAll(runningTaskIds());
for (final TaskId task : revokedTasks) {
if (running.containsKey(task)) {
revokedRunningTasks.add(task);
} else if (created.containsKey(task)) {
revokedNonRunningTasks.add(task);
} else if (restoring.containsKey(task)) {
revokedRestoringTasks.add(task);
} else if (!suspended.containsKey(task)) {
log.warn("Task {} was revoked but cannot be found in the assignment", task);
}
}
firstException.compareAndSet(null, suspendRunningTasks(revokedRunningTasks, revokedTaskChangelogs));
firstException.compareAndSet(null, closeNonRunningTasks(revokedNonRunningTasks, revokedTaskChangelogs));
firstException.compareAndSet(null, closeRestoringTasks(revokedRestoringTasks, revokedTaskChangelogs));
return firstException.get();
}
private RuntimeException suspendRunningTasks(final Set<TaskId> runningTasksToSuspend,
final List<TopicPartition> taskChangelogs) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
log.debug("Suspending running {} {}", taskTypeName, running.keySet());
for (final TaskId id : runningTasksToSuspend) {
final StreamTask task = running.get(id);
log.trace("Closing all restoring stream tasks {}", restoring.keySet());
final Iterator<StreamTask> restoringTaskIterator = restoring.values().iterator();
while (restoringTaskIterator.hasNext()) {
final StreamTask task = restoringTaskIterator.next();
log.debug("Closing restoring task {}", task.id());
try {
task.closeStateManager(true);
task.suspend();
suspended.put(id, task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
log.info("Failed to suspend {} {} since it got migrated to another thread already. " +
"Closing it as zombie and move on.", taskTypeName, id);
firstException.compareAndSet(null, closeZombieTask(task));
prevActiveTasks.remove(id);
} catch (final RuntimeException e) {
log.error("Failed to remove restoring task {} due to the following error:", task.id(), e);
if (exception == null) {
exception = e;
log.error("Suspending {} {} failed due to the following error:", taskTypeName, id, e);
firstException.compareAndSet(null, e);
try {
prevActiveTasks.remove(id);
task.close(false, false);
} catch (final RuntimeException f) {
log.error(
"After suspending failed, closing the same {} {} failed again due to the following error:",
taskTypeName, id, f);
}
} finally {
restoringTaskIterator.remove();
running.remove(id);
runningByPartition.keySet().removeAll(task.partitions());
runningByPartition.keySet().removeAll(task.changelogPartitions());
taskChangelogs.addAll(task.changelogPartitions());
}
}
restoring.clear();
restoredPartitions.clear();
restoringByPartition.clear();
log.trace("Successfully suspended the running {} {}", taskTypeName, suspended.keySet());
return exception;
return firstException.get();
}
private RuntimeException closeNonRunningTasks(final Set<TaskId> nonRunningTasksToClose,
final List<TopicPartition> closedTaskChangelogs) {
log.debug("Closing the created but not initialized {} {}", taskTypeName, nonRunningTasksToClose);
final AtomicReference<RuntimeException> firstException = new AtomicReference<>();
for (final TaskId id : nonRunningTasksToClose) {
final StreamTask task = created.get(id);
firstException.compareAndSet(null, closeNonRunning(false, task, closedTaskChangelogs));
}
return firstException.get();
}
RuntimeException closeRestoringTasks(final Set<TaskId> restoringTasksToClose,
final List<TopicPartition> closedTaskChangelogs) {
log.debug("Closing restoring stream tasks {}", restoringTasksToClose);
final AtomicReference<RuntimeException> firstException = new AtomicReference<>();
for (final TaskId id : restoringTasksToClose) {
final StreamTask task = restoring.get(id);
firstException.compareAndSet(null, closeRestoring(false, task, closedTaskChangelogs));
}
return firstException.get();
}
private RuntimeException closeRunning(final boolean isZombie,
final StreamTask task,
final List<TopicPartition> closedTaskChangelogs) {
running.remove(task.id());
runningByPartition.keySet().removeAll(task.partitions());
runningByPartition.keySet().removeAll(task.changelogPartitions());
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
final boolean clean = !isZombie;
task.close(clean, isZombie);
} catch (final RuntimeException e) {
log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
return e;
}
return null;
}
private RuntimeException closeNonRunning(final boolean isZombie,
final StreamTask task,
final List<TopicPartition> closedTaskChangelogs) {
created.remove(task.id());
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
task.close(false, isZombie);
} catch (final RuntimeException e) {
log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
return e;
}
return null;
}
private RuntimeException closeRestoring(final boolean isZombie,
final StreamTask task,
final List<TopicPartition> closedTaskChangelogs) {
restoring.remove(task.id());
closedTaskChangelogs.addAll(task.changelogPartitions());
for (final TopicPartition tp : task.partitions()) {
restoredPartitions.remove(tp);
restoringByPartition.remove(tp);
}
try {
final boolean clean = !isZombie;
task.closeStateManager(clean);
} catch (final RuntimeException e) {
log.error("Failed to close restoring task {} due to the following error:", task.id(), e);
return e;
}
return null;
}
private RuntimeException closeSuspended(final boolean isZombie,
final StreamTask task) {
suspended.remove(task.id());
try {
final boolean clean = !isZombie;
task.closeSuspended(clean, isZombie, null);
} catch (final RuntimeException e) {
log.error("Failed to close suspended {} {} due to the following error:", taskTypeName, task.id(), e);
return e;
}
return null;
}
RuntimeException closeNotAssignedSuspendedTasks(final Set<TaskId> revokedTasks) {
log.debug("Closing the revoked active tasks {} {}", taskTypeName, revokedTasks);
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final TaskId revokedTask : revokedTasks) {
final StreamTask suspendedTask = suspended.get(revokedTask);
// task may not be in the suspended tasks if it was closed due to some error
if (suspendedTask != null) {
firstException.compareAndSet(null, closeSuspended(false, suspendedTask));
} else {
log.debug("Revoked task {} could not be found in suspended, may have already been closed", revokedTask);
}
}
return firstException.get();
}
RuntimeException closeZombieTasks(final Set<TaskId> lostTasks, final List<TopicPartition> lostTaskChangelogs) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final TaskId id : lostTasks) {
if (suspended.containsKey(id)) {
firstException.compareAndSet(null, closeSuspended(true, suspended.get(id)));
} else if (created.containsKey(id)) {
firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs));
} else if (restoring.containsKey(id)) {
firstException.compareAndSet(null, closeRestoring(true, created.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));
} else {
// task may have already been closed as a zombie and removed from all task maps
}
}
// We always clear the prevActiveTasks and replace with current set of running tasks to encode in subscription
// We should exclude any tasks that were lost however, they will be counted as standbys for assignment purposes
prevActiveTasks.clear();
prevActiveTasks.addAll(running.keySet());
// With the current rebalance protocol, there should not be any running tasks left as they were all lost
if (!prevActiveTasks.isEmpty()) {
log.error("Found still running {} after closing all tasks lost as zombies", taskTypeName);
firstException.compareAndSet(null, new IllegalStateException("Not all lost tasks were closed as zombies"));
}
return firstException.get();
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
boolean maybeResumeSuspendedTask(final TaskId taskId,
final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final StreamTask task = suspended.get(taskId);
log.trace("Found suspended {} {}", taskTypeName, taskId);
suspended.remove(taskId);
if (task.partitions().equals(partitions)) {
task.resume();
try {
transitionToRunning(task);
} catch (final TaskMigratedException e) {
// we need to catch migration exception internally since this function
// is triggered in the rebalance callback
log.info("Failed to resume {} {} since it got migrated to another thread already. " +
"Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id());
final RuntimeException fatalException = closeZombieTask(task);
running.remove(taskId);
if (fatalException != null) {
throw fatalException;
}
throw e;
}
log.trace("Resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
task.closeSuspended(true, false, null);
}
}
return false;
}
void updateRestored(final Collection<TopicPartition> restored) {
@ -254,19 +504,24 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -254,19 +504,24 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
restoring.clear();
restoringByPartition.clear();
restoredPartitions.clear();
suspended.clear();
}
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
describe(builder, restoring.values(), indent, "Restoring:");
describe(builder, suspended.values(), indent, "Suspended:");
return builder.toString();
}
// for testing only
// the following are for testing only
Collection<StreamTask> restoringTasks() {
return Collections.unmodifiableCollection(restoring.values());
}
Set<TaskId> restoringTaskIds() {
return new HashSet<>(restoring.keySet());
}
}

137
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -38,14 +38,12 @@ import java.util.concurrent.atomic.AtomicReference; @@ -38,14 +38,12 @@ import java.util.concurrent.atomic.AtomicReference;
abstract class AssignedTasks<T extends Task> {
final Logger log;
private final String taskTypeName;
private final Map<TaskId, T> created = new HashMap<>();
private final Map<TaskId, T> suspended = new HashMap<>();
private final Set<TaskId> previousActiveTasks = new HashSet<>();
final String taskTypeName;
final Map<TaskId, T> created = new HashMap<>();
// IQ may access this map.
final Map<TaskId, T> running = new ConcurrentHashMap<>();
private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
final Map<TopicPartition, T> runningByPartition = new HashMap<>();
AssignedTasks(final LogContext logContext,
final String taskTypeName) {
@ -59,7 +57,7 @@ abstract class AssignedTasks<T extends Task> { @@ -59,7 +57,7 @@ abstract class AssignedTasks<T extends Task> {
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @throws StreamsException if the store's changelog does not contain the partition
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void initializeNewTasks() {
@ -85,68 +83,13 @@ abstract class AssignedTasks<T extends Task> { @@ -85,68 +83,13 @@ abstract class AssignedTasks<T extends Task> {
}
boolean allTasksRunning() {
return created.isEmpty() && suspended.isEmpty();
return created.isEmpty();
}
Collection<T> running() {
return running.values();
}
RuntimeException suspend() {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
firstException.compareAndSet(null, suspendTasks(running.values()));
log.trace("Close created {} {}", taskTypeName, created.keySet());
firstException.compareAndSet(null, closeNonRunningTasks(created.values()));
previousActiveTasks.clear();
previousActiveTasks.addAll(running.keySet());
running.clear();
created.clear();
runningByPartition.clear();
return firstException.get();
}
private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
RuntimeException exception = null;
for (final T task : tasks) {
try {
task.close(false, false);
} catch (final RuntimeException e) {
log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
if (exception == null) {
exception = e;
}
}
}
return exception;
}
private RuntimeException suspendTasks(final Collection<T> tasks) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
final T task = it.next();
try {
task.suspend();
suspended.put(task.id(), task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
log.info("Failed to suspend {} {} since it got migrated to another thread already. " +
"Closing it as zombie and move on.", taskTypeName, task.id());
firstException.compareAndSet(null, closeZombieTask(task));
it.remove();
} catch (final RuntimeException e) {
log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e);
firstException.compareAndSet(null, e);
try {
task.close(false, false);
} catch (final RuntimeException f) {
log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f);
}
}
}
return firstException.get();
}
RuntimeException closeZombieTask(final T task) {
try {
task.close(false, true);
@ -161,39 +104,6 @@ abstract class AssignedTasks<T extends Task> { @@ -161,39 +104,6 @@ abstract class AssignedTasks<T extends Task> {
return !running.isEmpty();
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final T task = suspended.get(taskId);
log.trace("Found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
task.resume();
try {
transitionToRunning(task);
} catch (final TaskMigratedException e) {
// we need to catch migration exception internally since this function
// is triggered in the rebalance callback
log.info("Failed to resume {} {} since it got migrated to another thread already. " +
"Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id());
final RuntimeException fatalException = closeZombieTask(task);
running.remove(task.id());
if (fatalException != null) {
throw fatalException;
}
throw e;
}
log.trace("Resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
}
}
return false;
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
@ -230,7 +140,6 @@ abstract class AssignedTasks<T extends Task> { @@ -230,7 +140,6 @@ abstract class AssignedTasks<T extends Task> {
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
describe(builder, running.values(), indent, "Running:");
describe(builder, suspended.values(), indent, "Suspended:");
describe(builder, created.values(), indent, "New:");
return builder.toString();
}
@ -249,7 +158,6 @@ abstract class AssignedTasks<T extends Task> { @@ -249,7 +158,6 @@ abstract class AssignedTasks<T extends Task> {
List<T> allTasks() {
final List<T> tasks = new ArrayList<>();
tasks.addAll(running.values());
tasks.addAll(suspended.values());
tasks.addAll(created.values());
return tasks;
}
@ -257,7 +165,6 @@ abstract class AssignedTasks<T extends Task> { @@ -257,7 +165,6 @@ abstract class AssignedTasks<T extends Task> {
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = new HashSet<>();
taskIds.addAll(running.keySet());
taskIds.addAll(suspended.keySet());
taskIds.addAll(created.keySet());
return taskIds;
}
@ -266,11 +173,6 @@ abstract class AssignedTasks<T extends Task> { @@ -266,11 +173,6 @@ abstract class AssignedTasks<T extends Task> {
runningByPartition.clear();
running.clear();
created.clear();
suspended.clear();
}
Set<TaskId> previousTaskIds() {
return previousActiveTasks;
}
/**
@ -280,6 +182,7 @@ abstract class AssignedTasks<T extends Task> { @@ -280,6 +182,7 @@ abstract class AssignedTasks<T extends Task> {
int commit() {
int committed = 0;
RuntimeException firstException = null;
for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
final T task = it.next();
try {
@ -314,33 +217,12 @@ abstract class AssignedTasks<T extends Task> { @@ -314,33 +217,12 @@ abstract class AssignedTasks<T extends Task> {
return committed;
}
void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment) {
final Iterator<T> standByTaskIterator = suspended.values().iterator();
while (standByTaskIterator.hasNext()) {
final T suspendedTask = standByTaskIterator.next();
if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) {
log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id());
try {
suspendedTask.closeSuspended(true, false, null);
} catch (final Exception e) {
log.error("Failed to remove suspended {} {} due to the following error:", taskTypeName, suspendedTask.id(), e);
} finally {
standByTaskIterator.remove();
}
}
}
}
void close(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final T task: allTasks()) {
try {
if (suspended.containsKey(task.id())) {
task.closeSuspended(clean, false, null);
} else {
task.close(clean, false);
}
closeTask(task, clean);
} catch (final TaskMigratedException e) {
log.info("Failed to close {} {} since it got migrated to another thread already. " +
"Closing it as zombie and move on.", taskTypeName, task.id());
@ -365,6 +247,10 @@ abstract class AssignedTasks<T extends Task> { @@ -365,6 +247,10 @@ abstract class AssignedTasks<T extends Task> {
}
}
void closeTask(final T task, final boolean clean) {
task.close(clean, false);
}
private boolean closeUnclean(final T task) {
log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
try {
@ -379,4 +265,5 @@ abstract class AssignedTasks<T extends Task> { @@ -379,4 +265,5 @@ abstract class AssignedTasks<T extends Task> {
return true;
}
}

15
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.List;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
@ -24,7 +25,7 @@ import java.util.Map; @@ -24,7 +25,7 @@ import java.util.Map;
/**
* Performs bulk read operations from a set of partitions. Used to
* restore {@link org.apache.kafka.streams.processor.StateStore}s from their
* change logs
* changelogs
*/
public interface ChangelogReader {
/**
@ -44,5 +45,15 @@ public interface ChangelogReader { @@ -44,5 +45,15 @@ public interface ChangelogReader {
*/
Map<TopicPartition, Long> restoredOffsets();
void reset();
/**
* Removes the passed in partitions from the set of changelogs
* @param revokedPartitions the set of partitions to remove
*/
void remove(List<TopicPartition> revokedPartitions);
/**
* Clear all partitions
*/
void clear();
}

24
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -79,9 +79,9 @@ public class StoreChangelogReader implements ChangelogReader { @@ -79,9 +79,9 @@ public class StoreChangelogReader implements ChangelogReader {
initialize(active);
}
if (needsRestoring.isEmpty()) {
if (needsRestoring.isEmpty() || restoreConsumer.assignment().isEmpty()) {
restoreConsumer.unsubscribe();
return completed();
return completedRestorers;
}
try {
@ -120,7 +120,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -120,7 +120,7 @@ public class StoreChangelogReader implements ChangelogReader {
restoreConsumer.unsubscribe();
}
return completed();
return completedRestorers;
}
private void initialize(final RestoringTasks active) {
@ -255,10 +255,6 @@ public class StoreChangelogReader implements ChangelogReader { @@ -255,10 +255,6 @@ public class StoreChangelogReader implements ChangelogReader {
endOffset);
}
private Collection<TopicPartition> completed() {
return completedRestorers;
}
private void refreshChangelogInfo() {
try {
partitionInfo.putAll(restoreConsumer.listTopics());
@ -280,7 +276,19 @@ public class StoreChangelogReader implements ChangelogReader { @@ -280,7 +276,19 @@ public class StoreChangelogReader implements ChangelogReader {
}
@Override
public void reset() {
public void remove(final List<TopicPartition> revokedPartitions) {
for (final TopicPartition partition : revokedPartitions) {
partitionInfo.remove(partition.topic());
stateRestorers.remove(partition);
needsRestoring.remove(partition);
restoreToOffsets.remove(partition);
needsInitializing.remove(partition);
completedRestorers.remove(partition);
}
}
@Override
public void clear() {
partitionInfo.clear();
stateRestorers.clear();
needsRestoring.clear();

214
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -73,43 +73,46 @@ public class StreamThread extends Thread { @@ -73,43 +73,46 @@ public class StreamThread extends Thread {
* The expected state transitions with the following defined states is:
*
* <pre>
* +-------------+
* +<--- | Created (0) |
* | +-----+-------+
* | |
* | v
* | +-----+-------+
* +<--- | Starting (1)|
* | +-----+-------+
* | |
* | |
* | v
* | +-----+-------+
* +<--- | Partitions |
* | | Revoked (2) | <----+
* | +-----+-------+ |
* | | |
* | v |
* | +-----+-------+ |
* | | Partitions | |
* +<--- | Assigned (3)| ---->+
* | +-----+-------+ |
* | | |
* | v |
* | +-----+-------+ |
* | | Running (4) | ---->+
* | +-----+-------+
* | |
* | v
* | +-----+-------+
* +---> | Pending |
* | Shutdown (5)|
* +-----+-------+
* |
* v
* +-----+-------+
* | Dead (6) |
* +-------------+
* +-------------+
* +<---- | Created (0) |
* | +-----+-------+
* | |
* | v
* | +-----+-------+
* +<---- | Starting (1)|----->+
* | +-----+-------+ |
* | | |
* | | |
* | v |
* | +-----+-------+ |
* +<---- | Partitions | |
* | | Revoked (2) | <----+
* | +-----+-------+ |
* | | ^ |
* | | | |
* | v | |
* | +-----+-------+ |
* +<---- | Partitions | |
* | | Assigned (3)| <----+
* | +-----+-------+ |
* | | |
* | | |
* | v |
* | +-----+-------+ |
* | | Running (4) | ---->+
* | +-----+-------+
* | |
* | |
* | v
* | +-----+-------+
* +----> | Pending |
* | Shutdown (5)|
* +-----+-------+
* |
* v
* +-----+-------+
* | Dead (6) |
* +-------------+
* </pre>
*
* Note the following:
@ -124,15 +127,20 @@ public class StreamThread extends Thread { @@ -124,15 +127,20 @@ public class StreamThread extends Thread {
* State PARTITIONS_REVOKED may want transit to itself indefinitely, in the corner case when
* the coordinator repeatedly fails in-between revoking partitions and assigning new partitions.
* Also during streams instance start up PARTITIONS_REVOKED may want to transit to itself as well.
* In this case we will forbid the transition but will not treat as an error.
* In this case we will allow the transition but it will be a no-op as the set of revoked partitions
* should be empty.
* </li>
* </ul>
*/
public enum State implements ThreadStateTransitionValidator {
// TODO: the current transitions from other states directly to PARTITIONS_REVOKED is due to
// the fact that onPartitionsRevoked may not be triggered. we need to refactor the
// state diagram more thoroughly after we refactor StreamsPartitionAssignor to support COOPERATIVE
CREATED(1, 5), STARTING(2, 3, 5), PARTITIONS_REVOKED(3, 5), PARTITIONS_ASSIGNED(2, 3, 4, 5), RUNNING(2, 3, 5), PENDING_SHUTDOWN(6), DEAD;
CREATED(1, 5), // 0
STARTING(2, 3, 5), // 1
PARTITIONS_REVOKED(3, 5), // 2
PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
RUNNING(2, 3, 5), // 4
PENDING_SHUTDOWN(6), // 5
DEAD; // 6
private final Set<Integer> validTransitions = new HashSet<>();
@ -206,12 +214,6 @@ public class StreamThread extends Thread { @@ -206,12 +214,6 @@ public class StreamThread extends Thread {
// when the state is already in NOT_RUNNING, all its transitions
// will be refused but we do not throw exception here
return null;
} else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
log.debug("Ignoring request to transit from PARTITIONS_REVOKED to PARTITIONS_REVOKED: " +
"self transition is not allowed");
// when the state is already in PARTITIONS_REVOKED, its transition to itself will be
// refused but we do not throw exception here
return null;
} else if (!state.isValidTransition(newState)) {
log.error("Unexpected state transition from {} to {}", oldState, newState);
throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
@ -245,104 +247,12 @@ public class StreamThread extends Thread { @@ -245,104 +247,12 @@ public class StreamThread extends Thread {
}
}
static class RebalanceListener implements ConsumerRebalanceListener {
private final Time time;
private final TaskManager taskManager;
private final StreamThread streamThread;
private final Logger log;
RebalanceListener(final Time time,
final TaskManager taskManager,
final StreamThread streamThread,
final Logger log) {
this.time = time;
this.taskManager = taskManager;
this.streamThread = streamThread;
this.log = log;
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
log.debug("Current state {}: assigned partitions {} at the end of consumer rebalance.\n" +
"\tcurrent suspended active tasks: {}\n" +
"\tcurrent suspended standby tasks: {}\n",
streamThread.state,
assignment,
taskManager.suspendedActiveTaskIds(),
taskManager.suspendedStandbyTaskIds());
if (streamThread.assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
streamThread.shutdown();
return;
}
final long start = time.milliseconds();
try {
if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
log.debug(
"Skipping task creation in rebalance because we are already in {} state.",
streamThread.state()
);
} else if (streamThread.assignmentErrorCode.get() != AssignorError.NONE.code()) {
log.debug(
"Encountered assignment error during partition assignment: {}. Skipping task initialization",
streamThread.assignmentErrorCode
);
} else {
log.debug("Creating tasks based on assignment.");
taskManager.createTasks(assignment);
}
} catch (final Throwable t) {
log.error(
"Error caught during partition assignment, " +
"will abort the current process and re-throw at the end of rebalance", t);
streamThread.setRebalanceException(t);
} finally {
log.info("partition assignment took {} ms.\n" +
"\tcurrent active tasks: {}\n" +
"\tcurrent standby tasks: {}\n" +
"\tprevious active tasks: {}\n",
time.milliseconds() - start,
taskManager.activeTaskIds(),
taskManager.standbyTaskIds(),
taskManager.prevActiveTaskIds());
}
}
int getAssignmentErrorCode() {
return assignmentErrorCode.get();
}
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> assignment) {
log.debug("Current state {}: revoked partitions {} at the beginning of consumer rebalance.\n" +
"\tcurrent assigned active tasks: {}\n" +
"\tcurrent assigned standby tasks: {}\n",
streamThread.state,
assignment,
taskManager.activeTaskIds(),
taskManager.standbyTaskIds());
if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
final long start = time.milliseconds();
try {
// suspend active tasks
taskManager.suspendTasksAndState();
} catch (final Throwable t) {
log.error(
"Error caught during partition revocation, " +
"will abort the current process and re-throw at the end of rebalance: {}",
t
);
streamThread.setRebalanceException(t);
} finally {
streamThread.clearStandbyRecords();
log.info("partition revocation took {} ms.\n" +
"\tsuspended active tasks: {}\n" +
"\tsuspended standby tasks: {}",
time.milliseconds() - start,
taskManager.suspendedActiveTaskIds(),
taskManager.suspendedStandbyTaskIds());
}
}
}
void setRebalanceException(final Throwable rebalanceException) {
this.rebalanceException = rebalanceException;
}
static abstract class AbstractTaskCreator<T extends Task> {
@ -705,7 +615,7 @@ public class StreamThread extends Thread { @@ -705,7 +615,7 @@ public class StreamThread extends Thread {
this.builder = builder;
this.logPrefix = logContext.logPrefix();
this.log = logContext.logger(StreamThread.class);
this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log);
this.taskManager = taskManager;
this.producer = producer;
this.restoreConsumer = restoreConsumer;
@ -784,10 +694,6 @@ public class StreamThread extends Thread { @@ -784,10 +694,6 @@ public class StreamThread extends Thread {
}
}
private void setRebalanceException(final Throwable rebalanceException) {
this.rebalanceException = rebalanceException;
}
/**
* Main event loop for polling, and processing records through topologies.
*
@ -1231,8 +1137,10 @@ public class StreamThread extends Thread { @@ -1231,8 +1137,10 @@ public class StreamThread extends Thread {
log.info("Shutdown complete");
}
private void clearStandbyRecords() {
standbyRecords.clear();
void clearStandbyRecords(final List<TopicPartition> partitions) {
for (final TopicPartition tp : partitions) {
standbyRecords.remove(tp);
}
}
/**
@ -1339,7 +1247,7 @@ public class StreamThread extends Thread { @@ -1339,7 +1247,7 @@ public class StreamThread extends Thread {
}
public Map<MetricName, Metric> adminClientMetrics() {
final Map<MetricName, ? extends Metric> adminClientMetrics = taskManager.getAdminClient().metrics();
final Map<MetricName, ? extends Metric> adminClientMetrics = taskManager.adminClient().metrics();
final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
result.putAll(adminClientMetrics);
return result;

39
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -166,6 +166,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -166,6 +166,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private RebalanceProtocol rebalanceProtocol;
protected String userEndPoint() {
return userEndPoint;
@ -196,14 +197,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -196,14 +197,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
userEndPoint = assignorConfiguration.getUserEndPoint();
internalTopicManager = assignorConfiguration.getInternalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.getCopartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
}
@Override
public String name() {
return "stream";
}
@Override
public List<RebalanceProtocol> supportedProtocols() {
final List<RebalanceProtocol> supportedProtocols = new ArrayList<>();
supportedProtocols.add(RebalanceProtocol.EAGER);
if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) {
supportedProtocols.add(rebalanceProtocol);
}
return supportedProtocols;
}
@Override
public ByteBuffer subscriptionUserData(final Set<String> topics) {
// Adds the following information to subscription
@ -211,7 +222,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -211,7 +222,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
final Set<TaskId> previousActiveTasks = taskManager.previousRunningTaskIds();
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
standbyTasks.removeAll(previousActiveTasks);
final SubscriptionInfo data = new SubscriptionInfo(
@ -780,20 +791,22 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -780,20 +791,22 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
switch (receivedAssignmentMetadataVersion) {
case VERSION_ONE:
processVersionOneAssignment(logPrefix, info, partitions, activeTasks);
processVersionOneAssignment(logPrefix, info, partitions, activeTasks, partitionsToTaskId);
partitionsByHost = Collections.emptyMap();
break;
case VERSION_TWO:
processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo);
processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
partitionsByHost = info.partitionsByHost();
break;
case VERSION_THREE:
case VERSION_FOUR:
case VERSION_FIVE:
upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion);
processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo);
processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
partitionsByHost = info.partitionsByHost();
break;
default:
@ -805,6 +818,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -805,6 +818,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
taskManager.setPartitionsByHostState(partitionsByHost);
taskManager.setPartitionsToTaskId(partitionsToTaskId);
taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
taskManager.updateSubscriptionsFromAssignment(partitions);
}
@ -812,7 +826,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -812,7 +826,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private static void processVersionOneAssignment(final String logPrefix,
final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks) {
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, TaskId> partitionsToTaskId) {
// the number of assigned partitions should be the same as number of active tasks, which
// could be duplicated if one task has more than one assigned partitions
if (partitions.size() != info.activeTasks().size()) {
@ -830,15 +845,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -830,15 +845,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final TopicPartition partition = partitions.get(i);
final TaskId id = info.activeTasks().get(i);
activeTasks.computeIfAbsent(id, k -> new HashSet<>()).add(partition);
partitionsToTaskId.put(partition, id);
}
}
public static void processVersionTwoAssignment(final String logPrefix,
final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
processVersionOneAssignment(logPrefix, info, partitions, activeTasks);
final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo,
final Map<TopicPartition, TaskId> partitionsToTaskId) {
processVersionOneAssignment(logPrefix, info, partitions, activeTasks, partitionsToTaskId);
// process partitions by host
final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();

169
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java

@ -0,0 +1,169 @@ @@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.slf4j.Logger;
public class StreamsRebalanceListener implements ConsumerRebalanceListener {
private final Time time;
private final TaskManager taskManager;
private final StreamThread streamThread;
private final Logger log;
StreamsRebalanceListener(final Time time,
final TaskManager taskManager,
final StreamThread streamThread,
final Logger log) {
this.time = time;
this.taskManager = taskManager;
this.streamThread = streamThread;
this.log = log;
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> assignedPartitions) {
log.debug("Current state {}: assigned partitions {} at the end of consumer rebalance.\n" +
"\tpreviously assigned active tasks: {}\n" +
"\tpreviously assigned standby tasks: {}\n",
streamThread.state(),
assignedPartitions,
taskManager.previousActiveTaskIds(),
taskManager.previousStandbyTaskIds());
if (streamThread.getAssignmentErrorCode() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode());
streamThread.shutdown();
return;
}
final long start = time.milliseconds();
List<TopicPartition> revokedStandbyPartitions = null;
try {
if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
log.debug(
"Skipping task creation in rebalance because we are already in {} state.",
streamThread.state()
);
} else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code()) {
log.debug(
"Encountered assignment error during partition assignment: {}. Skipping task initialization",
streamThread.getAssignmentErrorCode()
);
} else {
// Close non-reassigned tasks before initializing new ones as we may have suspended active
// tasks that become standbys or vice versa
revokedStandbyPartitions = taskManager.closeRevokedStandbyTasks();
taskManager.closeRevokedSuspendedTasks();
taskManager.createTasks(assignedPartitions);
}
} catch (final Throwable t) {
log.error(
"Error caught during partition assignment, " +
"will abort the current process and re-throw at the end of rebalance", t);
streamThread.setRebalanceException(t);
} finally {
if (revokedStandbyPartitions != null) {
streamThread.clearStandbyRecords(revokedStandbyPartitions);
}
log.info("partition assignment took {} ms.\n" +
"\tcurrently assigned active tasks: {}\n" +
"\tcurrently assigned standby tasks: {}\n" +
"\trevoked active tasks: {}\n" +
"\trevoked standby tasks: {}\n",
time.milliseconds() - start,
taskManager.activeTaskIds(),
taskManager.standbyTaskIds(),
taskManager.revokedActiveTaskIds(),
taskManager.revokedStandbyTaskIds());
}
}
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> revokedPartitions) {
log.debug("Current state {}: revoked partitions {} because of consumer rebalance.\n" +
"\tcurrently assigned active tasks: {}\n" +
"\tcurrently assigned standby tasks: {}\n",
streamThread.state(),
revokedPartitions,
taskManager.activeTaskIds(),
taskManager.standbyTaskIds());
Set<TaskId> suspendedTasks = new HashSet<>();
if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !revokedPartitions.isEmpty()) {
final long start = time.milliseconds();
try {
// suspend only the active tasks, reassigned standby tasks will be closed in onPartitionsAssigned
suspendedTasks = taskManager.suspendActiveTasksAndState(revokedPartitions);
} catch (final Throwable t) {
log.error(
"Error caught during partition revocation, " +
"will abort the current process and re-throw at the end of rebalance: ",
t
);
streamThread.setRebalanceException(t);
} finally {
log.info("partition revocation took {} ms.\n" +
"\tcurrent suspended active tasks: {}\n",
time.milliseconds() - start,
suspendedTasks);
}
}
}
@Override
public void onPartitionsLost(final Collection<TopicPartition> lostPartitions) {
log.info("at state {}: partitions {} lost due to missed rebalance.\n" +
"\tlost active tasks: {}\n" +
"\tlost assigned standby tasks: {}\n",
streamThread.state(),
lostPartitions,
taskManager.activeTaskIds(),
taskManager.standbyTaskIds());
Set<TaskId> lostTasks = new HashSet<>();
final long start = time.milliseconds();
try {
// close lost active tasks but don't try to commit offsets as we no longer own them
lostTasks = taskManager.closeLostTasks(lostPartitions);
} catch (final Throwable t) {
log.error(
"Error caught during partitions lost, " +
"will abort the current process and re-throw at the end of rebalance: ",
t
);
streamThread.setRebalanceException(t);
} finally {
log.info("partitions lost took {} ms.\n" +
"\tsuspended lost active tasks: {}\n",
time.milliseconds() - start,
lostTasks);
}
}
}

297
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
@ -60,10 +61,18 @@ public class TaskManager { @@ -60,10 +61,18 @@ public class TaskManager {
private final Admin adminClient;
private DeleteRecordsResult deleteRecordsResult;
// the restore consumer is only ever assigned changelogs from restoring tasks or standbys (but not both)
private boolean restoreConsumerAssignedStandbys = false;
// following information is updated during rebalance phase by the partition assignor
private Cluster cluster;
private Map<TaskId, Set<TopicPartition>> assignedActiveTasks;
private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks;
private Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
private Map<TaskId, Set<TopicPartition>> assignedActiveTasks = new HashMap<>();
private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = new HashMap<>();
private Map<TaskId, Set<TopicPartition>> addedActiveTasks = new HashMap<>();
private Map<TaskId, Set<TopicPartition>> addedStandbyTasks = new HashMap<>();
private Map<TaskId, Set<TopicPartition>> revokedActiveTasks = new HashMap<>();
private Map<TaskId, Set<TopicPartition>> revokedStandbyTasks = new HashMap<>();
private Consumer<byte[], byte[]> consumer;
@ -103,78 +112,54 @@ public class TaskManager { @@ -103,78 +112,54 @@ public class TaskManager {
throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
}
// do this first as we may have suspended standby tasks that
// will become active or vice versa
standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
if (!assignment.isEmpty() && !assignedActiveTasks.isEmpty()) {
resumeSuspended(assignment);
}
if (!addedActiveTasks.isEmpty()) {
addNewActiveTasks(addedActiveTasks);
}
if (!addedStandbyTasks.isEmpty()) {
addNewStandbyTasks(addedStandbyTasks);
}
// need to clear restore consumer if it was reading standbys but we have active tasks that may need restoring
if (!addedActiveTasks.isEmpty() && restoreConsumerAssignedStandbys) {
restoreConsumer.unsubscribe();
restoreConsumerAssignedStandbys = false;
}
addStreamTasks(assignment);
addStandbyTasks();
// Pause all the partitions until the underlying state store is ready for all the active tasks.
// Pause all the new partitions until the underlying state store is ready for all the active tasks.
log.trace("Pausing partitions: {}", assignment);
consumer.pause(assignment);
}
private void addStreamTasks(final Collection<TopicPartition> assignment) {
if (assignedActiveTasks == null || assignedActiveTasks.isEmpty()) {
return;
}
final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
// collect newly assigned tasks and reopen re-assigned tasks
log.debug("Adding assigned tasks as active: {}", assignedActiveTasks);
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedActiveTasks.entrySet()) {
final TaskId taskId = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
private void resumeSuspended(final Collection<TopicPartition> assignment) {
final Set<TaskId> suspendedTasks = partitionsToTaskSet(assignment);
suspendedTasks.removeAll(addedActiveTasks.keySet());
if (assignment.containsAll(partitions)) {
try {
if (!active.maybeResumeSuspendedTask(taskId, partitions)) {
newTasks.put(taskId, partitions);
}
} catch (final StreamsException e) {
log.error("Failed to resume an active task {} due to the following error:", taskId, e);
throw e;
for (final TaskId taskId : suspendedTasks) {
final Set<TopicPartition> partitions = assignedActiveTasks.get(taskId);
try {
if (!active.maybeResumeSuspendedTask(taskId, partitions)) {
// recreate if resuming the suspended task failed because the associated partitions changed
addedActiveTasks.put(taskId, partitions);
}
} else {
log.warn("Task {} owned partitions {} are not contained in the assignment {}", taskId, partitions, assignment);
} catch (final StreamsException e) {
log.error("Failed to resume an active task {} due to the following error:", taskId, e);
throw e;
}
}
}
if (newTasks.isEmpty()) {
return;
}
// CANNOT FIND RETRY AND BACKOFF LOGIC
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
log.debug("New active tasks to be created: {}", newTasks);
private void addNewActiveTasks(final Map<TaskId, Set<TopicPartition>> newActiveTasks) {
log.debug("New active tasks to be created: {}", newActiveTasks);
for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) {
for (final StreamTask task : taskCreator.createTasks(consumer, newActiveTasks)) {
active.addNewTask(task);
}
}
private void addStandbyTasks() {
if (assignedStandbyTasks == null || assignedStandbyTasks.isEmpty()) {
return;
}
log.debug("Adding assigned standby tasks {}", assignedStandbyTasks);
final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>();
// collect newly assigned standby tasks and reopen re-assigned standby tasks
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) {
final TaskId taskId = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
newStandbyTasks.put(taskId, partitions);
}
}
if (newStandbyTasks.isEmpty()) {
return;
}
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
private void addNewStandbyTasks(final Map<TaskId, Set<TopicPartition>> newStandbyTasks) {
log.trace("New standby tasks to be created: {}", newStandbyTasks);
for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
@ -182,18 +167,6 @@ public class TaskManager { @@ -182,18 +167,6 @@ public class TaskManager {
}
}
Set<TaskId> activeTaskIds() {
return active.allAssignedTaskIds();
}
Set<TaskId> standbyTaskIds() {
return standby.allAssignedTaskIds();
}
public Set<TaskId> prevActiveTaskIds() {
return active.previousTaskIds();
}
/**
* Returns ids of tasks whose states are kept on the local storage.
*/
@ -224,47 +197,90 @@ public class TaskManager { @@ -224,47 +197,90 @@ public class TaskManager {
return tasks;
}
public UUID processId() {
return processId;
/**
* Closes standby tasks that were not reassigned at the end of a rebalance.
*
* @return list of changelog topic partitions from revoked tasks
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
List<TopicPartition> closeRevokedStandbyTasks() {
final List<TopicPartition> revokedChangelogs = standby.closeRevokedStandbyTasks(revokedStandbyTasks);
// If the restore consumer is assigned any standby partitions they must be removed
removeChangelogsFromRestoreConsumer(revokedChangelogs, true);
return revokedChangelogs;
}
InternalTopologyBuilder builder() {
return taskCreator.builder();
/**
* Closes suspended active tasks that were not reassigned at the end of a rebalance.
*
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void closeRevokedSuspendedTasks() {
// changelogs should have already been removed during suspend
final RuntimeException exception = active.closeNotAssignedSuspendedTasks(revokedActiveTasks.keySet());
// At this point all revoked tasks should have been closed, we can just throw the exception
if (exception != null) {
throw exception;
}
}
/**
* Similar to shutdownTasksAndState, however does not close the task managers, in the hope that
* soon the tasks will be assigned again
* soon the tasks will be assigned again.
* @return list of suspended tasks
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void suspendTasksAndState() {
log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds());
Set<TaskId> suspendActiveTasksAndState(final Collection<TopicPartition> revokedPartitions) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
final List<TopicPartition> revokedChangelogs = new ArrayList<>();
firstException.compareAndSet(null, active.suspend());
// close all restoring tasks as well and then reset changelog reader;
// for those restoring and still assigned tasks, they will be re-created
// in addStreamTasks.
firstException.compareAndSet(null, active.closeAllRestoringTasks());
changelogReader.reset();
final Set<TaskId> revokedTasks = partitionsToTaskSet(revokedPartitions);
firstException.compareAndSet(null, standby.suspend());
firstException.compareAndSet(null, active.suspendOrCloseTasks(revokedTasks, revokedChangelogs));
// remove the changelog partitions from restore consumer
restoreConsumer.unsubscribe();
changelogReader.remove(revokedChangelogs);
removeChangelogsFromRestoreConsumer(revokedChangelogs, false);
final Exception exception = firstException.get();
if (exception != null) {
throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception);
}
return active.suspendedTaskIds();
}
/**
* Closes active tasks as zombies, as these partitions have been lost and are no longer owned.
* @return list of lost tasks
*/
Set<TaskId> closeLostTasks(final Collection<TopicPartition> lostPartitions) {
final Set<TaskId> zombieTasks = partitionsToTaskSet(lostPartitions);
log.debug("Closing lost tasks as zombies: {}", zombieTasks);
final List<TopicPartition> lostTaskChangelogs = new ArrayList<>();
final RuntimeException exception = active.closeZombieTasks(zombieTasks, lostTaskChangelogs);
assignedActiveTasks.keySet().removeAll(zombieTasks);
changelogReader.remove(lostTaskChangelogs);
removeChangelogsFromRestoreConsumer(lostTaskChangelogs, false);
if (exception != null) {
throw exception;
} else if (!assignedActiveTasks.isEmpty()) {
throw new IllegalStateException("TaskManager had leftover tasks after removing all zombies");
}
return zombieTasks;
}
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
active.previousTaskIds(), standby.previousTaskIds());
log.debug("Shutting down all active tasks {}, standby tasks {}, and suspended tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
active.suspendedTaskIds());
try {
active.close(clean);
@ -288,16 +304,38 @@ public class TaskManager { @@ -288,16 +304,38 @@ public class TaskManager {
}
}
Admin getAdminClient() {
return adminClient;
Set<TaskId> activeTaskIds() {
return active.allAssignedTaskIds();
}
Set<TaskId> suspendedActiveTaskIds() {
return active.previousTaskIds();
Set<TaskId> standbyTaskIds() {
return standby.allAssignedTaskIds();
}
Set<TaskId> suspendedStandbyTaskIds() {
return standby.previousTaskIds();
Set<TaskId> revokedActiveTaskIds() {
return revokedActiveTasks.keySet();
}
Set<TaskId> revokedStandbyTaskIds() {
return revokedStandbyTasks.keySet();
}
public Set<TaskId> previousRunningTaskIds() {
return active.previousRunningTaskIds();
}
Set<TaskId> previousActiveTaskIds() {
final HashSet<TaskId> previousActiveTasks = new HashSet<>(assignedActiveTasks.keySet());
previousActiveTasks.addAll(revokedActiveTasks.keySet());
previousActiveTasks.removeAll(addedActiveTasks.keySet());
return previousActiveTasks;
}
Set<TaskId> previousStandbyTaskIds() {
final HashSet<TaskId> previousStandbyTasks = new HashSet<>(assignedStandbyTasks.keySet());
previousStandbyTasks.addAll(revokedStandbyTasks.keySet());
previousStandbyTasks.removeAll(addedStandbyTasks.keySet());
return previousStandbyTasks;
}
StreamTask activeTask(final TopicPartition partition) {
@ -320,6 +358,14 @@ public class TaskManager { @@ -320,6 +358,14 @@ public class TaskManager {
this.consumer = consumer;
}
public UUID processId() {
return processId;
}
InternalTopologyBuilder builder() {
return taskCreator.builder();
}
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
@ -329,8 +375,8 @@ public class TaskManager { @@ -329,8 +375,8 @@ public class TaskManager {
standby.initializeNewTasks();
final Collection<TopicPartition> restored = changelogReader.restore(active);
active.updateRestored(restored);
removeChangelogsFromRestoreConsumer(restored, false);
if (active.allTasksRunning()) {
final Set<TopicPartition> assignment = consumer.assignment();
@ -357,6 +403,7 @@ public class TaskManager { @@ -357,6 +403,7 @@ public class TaskManager {
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
}
restoreConsumerAssignedStandbys = true;
restoreConsumer.assign(checkpointedOffsets.keySet());
for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
@ -377,8 +424,40 @@ public class TaskManager { @@ -377,8 +424,40 @@ public class TaskManager {
this.streamsMetadataState.onChange(partitionsByHostState, cluster);
}
public void setPartitionsToTaskId(final Map<TopicPartition, TaskId> partitionsToTaskId) {
this.partitionsToTaskId = partitionsToTaskId;
}
public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
addedActiveTasks.clear();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : activeTasks.entrySet()) {
if (!assignedActiveTasks.containsKey(entry.getKey())) {
addedActiveTasks.put(entry.getKey(), entry.getValue());
}
}
addedStandbyTasks.clear();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
if (!assignedStandbyTasks.containsKey(entry.getKey())) {
addedStandbyTasks.put(entry.getKey(), entry.getValue());
}
}
revokedActiveTasks.clear();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedActiveTasks.entrySet()) {
if (!activeTasks.containsKey(entry.getKey())) {
revokedActiveTasks.put(entry.getKey(), entry.getValue());
}
}
revokedStandbyTasks.clear();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) {
if (!standbyTasks.containsKey(entry.getKey())) {
revokedStandbyTasks.put(entry.getKey(), entry.getValue());
}
}
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;
}
@ -482,6 +561,30 @@ public class TaskManager { @@ -482,6 +561,30 @@ public class TaskManager {
return builder.toString();
}
// this should be safe to call whether the restore consumer is assigned standby or active restoring partitions
// as the removal will be a no-op
private void removeChangelogsFromRestoreConsumer(final Collection<TopicPartition> changelogs, final boolean areStandbyPartitions) {
if (!changelogs.isEmpty() && areStandbyPartitions == restoreConsumerAssignedStandbys) {
final Set<TopicPartition> updatedAssignment = new HashSet<>(restoreConsumer.assignment());
updatedAssignment.removeAll(changelogs);
restoreConsumer.assign(updatedAssignment);
}
}
private Set<TaskId> partitionsToTaskSet(final Collection<TopicPartition> partitions) {
final Set<TaskId> taskIds = new HashSet<>();
for (final TopicPartition tp : partitions) {
final TaskId id = partitionsToTaskId.get(tp);
if (id != null) {
taskIds.add(id);
} else {
log.error("Failed to lookup taskId for partition {}", tp);
throw new StreamsException("Found partition in assignment with no corresponding task");
}
}
return taskIds;
}
// the following functions are for testing only
Map<TaskId, Set<TopicPartition>> assignedActiveTasks() {
return assignedActiveTasks;

24
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
@ -131,6 +132,29 @@ public final class AssignorConfiguration { @@ -131,6 +132,29 @@ public final class AssignorConfiguration {
return taskManager;
}
public RebalanceProtocol rebalanceProtocol() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
log.info("Turning off cooperative rebalancing for upgrade from {}.x", upgradeFrom);
return RebalanceProtocol.EAGER;
default:
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
}
}
return RebalanceProtocol.EAGER;
}
public String logPrefix() {
return logPrefix;
}

4
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java

@ -413,7 +413,9 @@ public class RegexSourceIntegrationTest { @@ -413,7 +413,9 @@ public class RegexSourceIntegrationTest {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
assignedTopics.clear();
for (final TopicPartition partition : partitions) {
assignedTopics.remove(partition.topic());
}
listener.onPartitionsRevoked(partitions);
}

36
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

@ -26,8 +26,10 @@ import static org.junit.Assert.assertSame; @@ -26,8 +26,10 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@ -62,11 +64,15 @@ public class AssignedStreamsTasksTest { @@ -62,11 +64,15 @@ public class AssignedStreamsTasksTest {
private final TaskId taskId2 = new TaskId(1, 0);
private AssignedStreamsTasks assignedTasks;
private final List<TopicPartition> revokedChangelogs = new ArrayList<>();
@Before
public void before() {
assignedTasks = new AssignedStreamsTasks(new LogContext("log "));
EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
revokedChangelogs.clear();
}
@Test
@ -155,33 +161,35 @@ public class AssignedStreamsTasksTest { @@ -155,33 +161,35 @@ public class AssignedStreamsTasksTest {
assertThat(suspendTask(), nullValue());
assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
assertThat(assignedTasks.suspendedTaskIds(), equalTo(Collections.singleton(taskId1)));
EasyMock.verify(t1);
}
@Test
public void shouldCloseRestoringTasks() {
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(2);
t1.closeStateManager(true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
addAndInitTask();
assertThat(assignedTasks.closeAllRestoringTasks(), nullValue());
assertThat(assignedTasks.closeRestoringTasks(assignedTasks.restoringTaskIds(), revokedChangelogs), nullValue());
EasyMock.verify(t1);
}
@Test
public void shouldClosedUnInitializedTasksOnSuspend() {
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList);
t1.close(false, false);
EasyMock.expectLastCall();
EasyMock.replay(t1);
assignedTasks.addNewTask(t1);
assertThat(assignedTasks.suspend(), nullValue());
assertThat(assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(), revokedChangelogs), nullValue());
EasyMock.verify(t1);
}
@ -192,7 +200,7 @@ public class AssignedStreamsTasksTest { @@ -192,7 +200,7 @@ public class AssignedStreamsTasksTest {
EasyMock.replay(t1);
assertThat(suspendTask(), nullValue());
assertThat(assignedTasks.suspend(), nullValue());
assertThat(assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(), revokedChangelogs), nullValue());
EasyMock.verify(t1);
}
@ -200,20 +208,28 @@ public class AssignedStreamsTasksTest { @@ -200,20 +208,28 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseTaskOnSuspendWhenRuntimeException() {
mockTaskInitialization();
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.suspend();
EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
t1.close(false, false);
EasyMock.expectLastCall();
EasyMock.replay(t1);
assertThat(suspendTask(), not(nullValue()));
assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
assertTrue(assignedTasks.runningTaskIds().isEmpty());
assertTrue(assignedTasks.suspendedTaskIds().isEmpty());
EasyMock.verify(t1);
}
@Test
public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
mockTaskInitialization();
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.suspend();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
t1.close(false, true);
@ -221,7 +237,7 @@ public class AssignedStreamsTasksTest { @@ -221,7 +237,7 @@ public class AssignedStreamsTasksTest {
EasyMock.replay(t1);
assertThat(suspendTask(), nullValue());
assertTrue(assignedTasks.previousTaskIds().isEmpty());
assertTrue(assignedTasks.runningTaskIds().isEmpty());
EasyMock.verify(t1);
}
@ -529,7 +545,7 @@ public class AssignedStreamsTasksTest { @@ -529,7 +545,7 @@ public class AssignedStreamsTasksTest {
assignedTasks.addNewTask(task);
assignedTasks.initializeNewTasks();
assertNull(assignedTasks.suspend());
assertNull(assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(), revokedChangelogs));
assignedTasks.close(true);
}
@ -541,7 +557,7 @@ public class AssignedStreamsTasksTest { @@ -541,7 +557,7 @@ public class AssignedStreamsTasksTest {
private RuntimeException suspendTask() {
addAndInitTask();
return assignedTasks.suspend();
return assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(), revokedChangelogs);
}
private void mockRunningTaskSuspension() {

10
streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.List;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
@ -48,10 +49,17 @@ public class MockChangelogReader implements ChangelogReader { @@ -48,10 +49,17 @@ public class MockChangelogReader implements ChangelogReader {
}
@Override
public void reset() {
public void clear() {
registered.clear();
}
@Override
public void remove(final List<TopicPartition> revokedPartitions) {
for (final TopicPartition partition : revokedPartitions) {
restoredOffsets.remove(partition);
}
}
public boolean wasRegistered(final TopicPartition partition) {
return registered.contains(partition);
}

66
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -384,12 +384,14 @@ public class StreamThreadTest { @@ -384,12 +384,14 @@ public class StreamThreadTest {
thread.setState(StreamThread.State.STARTING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
thread.taskManager().setAssignmentMetadata(
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
task1,
assignedPartitions),
Collections.emptyMap());
thread.taskManager().setPartitionsToTaskId(Collections.singletonMap(t1p1, task1));
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
@ -584,13 +586,17 @@ public class StreamThreadTest { @@ -584,13 +586,17 @@ public class StreamThreadTest {
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
// assign single partition
assignedPartitions.add(t1p1);
assignedPartitions.add(t1p2);
activeTasks.put(task1, Collections.singleton(t1p1));
activeTasks.put(task2, Collections.singleton(t1p2));
partitionsToTaskId.put(t1p1, task1);
partitionsToTaskId.put(t1p2, task2);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
@ -720,6 +726,7 @@ public class StreamThreadTest { @@ -720,6 +726,7 @@ public class StreamThreadTest {
private void mockRunOnce(final boolean shutdownOnPoll) {
final Collection<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
final Map<TopicPartition, TaskId> partitionsToTaskId = Collections.singletonMap(t1p1, new TaskId(0, 1));
class MockStreamThreadConsumer<K, V> extends MockConsumer<K, V> {
private StreamThread streamThread;
@ -734,6 +741,7 @@ public class StreamThreadTest { @@ -734,6 +741,7 @@ public class StreamThreadTest {
if (shutdownOnPoll) {
streamThread.shutdown();
}
streamThread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
return super.poll(timeout);
}
@ -758,6 +766,7 @@ public class StreamThreadTest { @@ -758,6 +766,7 @@ public class StreamThreadTest {
new AssignedStandbyTasks(new LogContext()));
taskManager.setConsumer(mockStreamThreadConsumer);
taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
taskManager.setPartitionsToTaskId(Collections.emptyMap());
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, StreamsConfig.METRICS_LATEST);
@ -849,15 +858,18 @@ public class StreamThreadTest { @@ -849,15 +858,18 @@ public class StreamThreadTest {
consumer.updatePartitions(topic1, singletonList(new PartitionInfo(topic1, 1, null, null, null)));
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
@ -907,15 +919,18 @@ public class StreamThreadTest { @@ -907,15 +919,18 @@ public class StreamThreadTest {
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
@ -928,7 +943,7 @@ public class StreamThreadTest { @@ -928,7 +943,7 @@ public class StreamThreadTest {
assertThat(thread.tasks().size(), equalTo(1));
clientSupplier.producers.get(0).fenceProducer();
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
assertTrue(clientSupplier.producers.get(0).transactionInFlight());
assertFalse(clientSupplier.producers.get(0).transactionCommitted());
assertTrue(clientSupplier.producers.get(0).closed());
@ -943,15 +958,18 @@ public class StreamThreadTest { @@ -943,15 +958,18 @@ public class StreamThreadTest {
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
@ -964,7 +982,7 @@ public class StreamThreadTest { @@ -964,7 +982,7 @@ public class StreamThreadTest {
assertThat(thread.tasks().size(), equalTo(1));
clientSupplier.producers.get(0).fenceProducerOnClose();
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
assertFalse(clientSupplier.producers.get(0).transactionInFlight());
assertTrue(clientSupplier.producers.get(0).transactionCommitted());
@ -999,15 +1017,18 @@ public class StreamThreadTest { @@ -999,15 +1017,18 @@ public class StreamThreadTest {
final StreamThread thread = createStreamThread(clientId, config, false);
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
@ -1048,13 +1069,16 @@ public class StreamThreadTest { @@ -1048,13 +1069,16 @@ public class StreamThreadTest {
restoreConsumer.updateBeginningOffsets(offsets);
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
// assign single partition
standbyTasks.put(task1, Collections.singleton(t1p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
@ -1125,14 +1149,17 @@ public class StreamThreadTest { @@ -1125,14 +1149,17 @@ public class StreamThreadTest {
}
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
// assign single partition
standbyTasks.put(task1, Collections.singleton(t1p1));
standbyTasks.put(task3, Collections.singleton(t2p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
@ -1230,15 +1257,18 @@ public class StreamThreadTest { @@ -1230,15 +1257,18 @@ public class StreamThreadTest {
final StreamThread thread = createStreamThread(clientId, config, false);
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener.onPartitionsRevoked(null);
thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
final List<TopicPartition> assignedPartitions = new ArrayList<>();
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
partitionsToTaskId.put(t1p1, task1);
thread.taskManager().setPartitionsToTaskId(partitionsToTaskId);
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
clientSupplier.consumer.assign(assignedPartitions);
@ -1357,7 +1387,9 @@ public class StreamThreadTest { @@ -1357,7 +1387,9 @@ public class StreamThreadTest {
final Set<TopicPartition> topicPartitionSet = Collections.singleton(topicPartition);
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
activeTasks.put(new TaskId(0, 0), topicPartitionSet);
final TaskId task0 = new TaskId(0, 0);
activeTasks.put(task0, topicPartitionSet);
thread.taskManager().setPartitionsToTaskId(Collections.singletonMap(topicPartition, task0));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
mockConsumer.updatePartitions(
@ -1463,11 +1495,11 @@ public class StreamThreadTest { @@ -1463,11 +1495,11 @@ public class StreamThreadTest {
thread.setState(StreamThread.State.STARTING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
thread.taskManager().setPartitionsToTaskId(Collections.singletonMap(t1p1, task1));
thread.taskManager().setAssignmentMetadata(
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
assignedPartitions),
Collections.singletonMap(task1, assignedPartitions),
Collections.emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
@ -1534,12 +1566,14 @@ public class StreamThreadTest { @@ -1534,12 +1566,14 @@ public class StreamThreadTest {
thread.setState(StreamThread.State.STARTING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
thread.taskManager().setAssignmentMetadata(
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
task1,
assignedPartitions),
Collections.emptyMap());
thread.taskManager().setPartitionsToTaskId(Collections.singletonMap(t1p1, task1));
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
@ -1690,7 +1724,7 @@ public class StreamThreadTest { @@ -1690,7 +1724,7 @@ public class StreamThreadTest {
null,
new MockTime());
EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient);
EasyMock.expect(taskManager.adminClient()).andReturn(adminClient);
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java

@ -152,7 +152,7 @@ public class StreamsPartitionAssignorTest { @@ -152,7 +152,7 @@ public class StreamsPartitionAssignorTest {
taskManager = EasyMock.createNiceMock(TaskManager.class);
EasyMock.expect(taskManager.adminClient()).andReturn(null).anyTimes();
EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes();
EasyMock.expect(taskManager.prevActiveTaskIds()).andReturn(prevTasks).anyTimes();
EasyMock.expect(taskManager.previousRunningTaskIds()).andReturn(prevTasks).anyTimes();
EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes();
EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes();
}

114
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
@ -67,6 +69,7 @@ public class TaskManagerTest { @@ -67,6 +69,7 @@ public class TaskManagerTest {
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
private final Set<TopicPartition> taskId0Partitions = Utils.mkSet(t1p0);
private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = Collections.singletonMap(taskId0, taskId0Partitions);
private final Map<TopicPartition, TaskId> taskId0PartitionToTaskId = Collections.singletonMap(t1p0, taskId0);
@Mock(type = MockType.STRICT)
private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates;
@ -113,6 +116,10 @@ public class TaskManagerTest { @@ -113,6 +116,10 @@ public class TaskManagerTest {
private final TaskId task03 = new TaskId(0, 3);
private final TaskId task11 = new TaskId(1, 1);
private final Set<TaskId> revokedTasks = new HashSet<>();
private final List<TopicPartition> revokedPartitions = new ArrayList<>();
private final List<TopicPartition> revokedChangelogs = Collections.emptyList();
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
@ -129,6 +136,7 @@ public class TaskManagerTest { @@ -129,6 +136,7 @@ public class TaskManagerTest {
active,
standby);
taskManager.setConsumer(consumer);
revokedChangelogs.clear();
}
private void replay() {
@ -235,81 +243,77 @@ public class TaskManagerTest { @@ -235,81 +243,77 @@ public class TaskManagerTest {
}
@Test
public void shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() {
public void shouldCloseActiveUnAssignedSuspendedTasksWhenClosingRevokedTasks() {
mockSingleActiveTask();
active.closeNonAssignedSuspendedTasks(taskId0Assignment);
expectLastCall();
EasyMock.expect(active.closeNotAssignedSuspendedTasks(taskId0Assignment.keySet())).andReturn(null).once();
expect(restoreConsumer.assignment()).andReturn(Collections.emptySet());
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.createTasks(taskId0Partitions);
taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.closeRevokedSuspendedTasks();
verify(active);
}
@Test
public void shouldCloseStandbyUnAssignedSuspendedTasksWhenCreatingNewTasks() {
public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
mockSingleActiveTask();
standby.closeNonAssignedSuspendedTasks(taskId0Assignment);
expectLastCall();
EasyMock.expect(standby.closeRevokedStandbyTasks(taskId0Assignment)).andReturn(Collections.emptyList()).once();
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId);
taskManager.createTasks(taskId0Partitions);
verify(active);
}
@Test
public void shouldAddNonResumedActiveTasks() {
public void shouldAddNonResumedSuspendedTasks() {
mockSingleActiveTask();
expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
active.addNewTask(EasyMock.same(streamTask));
replay();
// Need to call this twice so task manager doesn't consider all partitions "new"
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId);
taskManager.createTasks(taskId0Partitions);
verify(activeTaskCreator, active);
}
@Test
public void shouldNotAddResumedActiveTasks() {
checkOrder(active, true);
expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
public void shouldAddNewActiveTasks() {
mockSingleActiveTask();
active.addNewTask(EasyMock.same(streamTask));
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId);
taskManager.createTasks(taskId0Partitions);
// should be no calls to activeTaskCreator and no calls to active.addNewTasks(..)
verify(active, activeTaskCreator);
}
@Test
public void shouldAddNonResumedStandbyTasks() {
mockStandbyTaskExpectations();
expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
standby.addNewTask(EasyMock.same(standbyTask));
replay();
taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment);
taskManager.createTasks(taskId0Partitions);
verify(standbyTaskCreator, active);
verify(activeTaskCreator, active);
}
@Test
public void shouldNotAddResumedStandbyTasks() {
public void shouldNotAddResumedActiveTasks() {
checkOrder(active, true);
expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
replay();
taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment);
// Need to call this twice so task manager doesn't consider all partitions "new"
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId);
taskManager.createTasks(taskId0Partitions);
// should be no calls to standbyTaskCreator and no calls to standby.addNewTasks(..)
verify(standby, standbyTaskCreator);
// should be no calls to activeTaskCreator and no calls to active.addNewTasks(..)
verify(active, activeTaskCreator);
}
@Test
@ -320,48 +324,46 @@ public class TaskManagerTest { @@ -320,48 +324,46 @@ public class TaskManagerTest {
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId);
taskManager.createTasks(taskId0Partitions);
verify(consumer);
}
@Test
public void shouldSuspendActiveTasks() {
expect(active.suspend()).andReturn(null);
expect(active.suspendOrCloseTasks(revokedTasks, revokedChangelogs)).andReturn(null);
expect(restoreConsumer.assignment()).andReturn(Collections.emptySet());
replay();
taskManager.suspendTasksAndState();
taskManager.suspendActiveTasksAndState(revokedPartitions);
verify(active);
}
@Test
public void shouldSuspendStandbyTasks() {
expect(standby.suspend()).andReturn(null);
replay();
taskManager.suspendTasksAndState();
verify(standby);
}
@Test
@SuppressWarnings("unchecked")
public void shouldUnassignChangelogPartitionsOnSuspend() {
restoreConsumer.unsubscribe();
expect(active.suspendOrCloseTasks(revokedTasks, new ArrayList<>()))
.andAnswer(() -> {
((List) EasyMock.getCurrentArguments()[1]).add(t1p0);
return null;
});
expect(restoreConsumer.assignment()).andReturn(Collections.singleton(t1p0));
restoreConsumer.assign(Collections.emptySet());
expectLastCall();
replay();
taskManager.suspendTasksAndState();
taskManager.suspendActiveTasksAndState(Collections.emptySet());
verify(restoreConsumer);
}
@Test
public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
expect(active.suspend()).andReturn(new RuntimeException(""));
expect(standby.suspend()).andReturn(new RuntimeException(""));
expectLastCall();
restoreConsumer.unsubscribe();
expect(active.suspendOrCloseTasks(revokedTasks, revokedChangelogs)).andReturn(new RuntimeException(""));
replay();
try {
taskManager.suspendTasksAndState();
taskManager.suspendActiveTasksAndState(revokedPartitions);
fail("Should have thrown streams exception");
} catch (final StreamsException e) {
// expected
@ -401,6 +403,8 @@ public class TaskManagerTest { @@ -401,6 +403,8 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewActiveTasks() {
EasyMock.expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()).once();
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once();
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
expectLastCall();
replay();
@ -411,6 +415,8 @@ public class TaskManagerTest { @@ -411,6 +415,8 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewStandbyTasks() {
EasyMock.expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()).once();
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once();
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
expectLastCall();
replay();
@ -421,6 +427,7 @@ public class TaskManagerTest { @@ -421,6 +427,7 @@ public class TaskManagerTest {
@Test
public void shouldRestoreStateFromChangeLogReader() {
EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once();
expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
active.updateRestored(taskId0Partitions);
expectLastCall();
@ -432,6 +439,7 @@ public class TaskManagerTest { @@ -432,6 +439,7 @@ public class TaskManagerTest {
@Test
public void shouldResumeRestoredPartitions() {
EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once();
expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
expect(active.allTasksRunning()).andReturn(true);
expect(consumer.assignment()).andReturn(taskId0Partitions);
@ -475,6 +483,7 @@ public class TaskManagerTest { @@ -475,6 +483,7 @@ public class TaskManagerTest {
@Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
expect(active.allTasksRunning()).andReturn(false);
EasyMock.expect(changeLogReader.restore(active)).andReturn(Collections.emptySet()).once();
replay();
assertFalse(taskManager.updateNewAndRestoringTasks());
@ -623,10 +632,12 @@ public class TaskManagerTest { @@ -623,10 +632,12 @@ public class TaskManagerTest {
@Test
public void shouldNotResumeConsumptionUntilAllStoresRestored() {
EasyMock.expect(changeLogReader.restore(active)).andReturn(Collections.emptySet()).once();
expect(active.allTasksRunning()).andReturn(false);
final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
taskManager.setConsumer(consumer);
EasyMock.replay(active, consumer);
EasyMock.replay(active, consumer, changeLogReader);
// shouldn't invoke `resume` method in consumer
taskManager.updateNewAndRestoringTasks();
@ -662,6 +673,9 @@ public class TaskManagerTest { @@ -662,6 +673,9 @@ public class TaskManagerTest {
expectLastCall();
EasyMock.replay(task);
EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once();
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once();
}
private void mockStandbyTaskExpectations() {

7
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -124,7 +124,7 @@ public class StreamsUpgradeTest { @@ -124,7 +124,7 @@ public class StreamsUpgradeTest {
// 3. Task ids of valid local states on the client's state directory.
final TaskManager taskManager = taskManger();
final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
final Set<TaskId> previousActiveTasks = taskManager.previousRunningTaskIds();
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
standbyTasks.removeAll(previousActiveTasks);
final FutureSubscriptionInfo data = new FutureSubscriptionInfo(
@ -176,12 +176,15 @@ public class StreamsUpgradeTest { @@ -176,12 +176,15 @@ public class StreamsUpgradeTest {
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
processVersionTwoAssignment("test ", info, partitions, activeTasks, topicToPartitionInfo);
final Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<>();
processVersionTwoAssignment("test ", info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
partitionsByHost = info.partitionsByHost();
final TaskManager taskManager = taskManger();
taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
taskManager.setPartitionsByHostState(partitionsByHost);
taskManager.setPartitionsToTaskId(partitionsToTaskId);
taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
taskManager.updateSubscriptionsFromAssignment(partitions);
}

Loading…
Cancel
Save