From d61b0c131ced14cd3a006ef4ccff39f6d9ad51e8 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Fri, 1 Nov 2019 16:10:43 -0700 Subject: [PATCH] KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance (#7620) Currently when we identify version probing we return early from onAssignment and never get to updating the TaskManager and general state with the new assignment. Since we do actually give out "real" assignments even during version probing, a StreamThread should take real ownership of its tasks/partitions including cleaning them up in onPartitionsRevoked which gets invoked when we call onLeavePrepare as part of triggering the follow-up rebalance. Every member will always get an assignment encoded with the lowest common version, so there should be no problem decoding a VP assignment. We should just allow onAssignment to proceed as usual so that the TaskManager is in a consistent state, and knows what all its tasks/partitions are when the first rebalance completes and the next one is triggered. Reviewers: Boyang Chen , Matthias J. Sax , Guozhang Wang --- .../consumer/internals/ConsumerCoordinator.java | 6 ++++++ .../processor/internals/AssignedStandbyTasks.java | 4 ++-- .../processor/internals/AssignedStreamsTasks.java | 4 ++-- .../internals/StreamsPartitionAssignor.java | 1 - .../internals/StreamsRebalanceListener.java | 10 +--------- .../streams/processor/internals/TaskManager.java | 13 +++++++------ .../kafka/streams/tests/StreamsUpgradeTest.java | 1 - 7 files changed, 18 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index d5b306167c9..61bd48a624a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -692,6 +692,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @Override public void onLeavePrepare() { + // Save the current Generation and use that to get the memberId, as the hb thread can change it at any time + final Generation currentGeneration = generation(); + final String memberId = currentGeneration.memberId; + + log.debug("Executing onLeavePrepare with generation {} and memberId {}", currentGeneration, memberId); + // we should reset assignment and trigger the callback before leaving group Set droppedPartitions = new HashSet<>(subscriptions.assignedPartitions()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java index f217a555338..0f8896eea79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java @@ -33,10 +33,10 @@ class AssignedStandbyTasks extends AssignedTasks { @Override public void shutdown(final boolean clean) { final String shutdownType = clean ? "Clean" : "Unclean"; - log.debug(shutdownType + " shutdown of all standby tasks" + "\n" + + log.debug("{} shutdown of all standby tasks" + "\n" + "non-initialized standby tasks to close: {}" + "\n" + "running standby tasks to close: {}", - clean, created.keySet(), running.keySet()); + shutdownType, created.keySet(), running.keySet()); super.shutdown(clean); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index 161714e34cf..1400d5a13cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -494,12 +494,12 @@ class AssignedStreamsTasks extends AssignedTasks implements Restorin @Override public void shutdown(final boolean clean) { final String shutdownType = clean ? "Clean" : "Unclean"; - log.debug(shutdownType + " shutdown of all active tasks" + "\n" + + log.debug("{} shutdown of all active tasks" + "\n" + "non-initialized stream tasks to close: {}" + "\n" + "restoring tasks to close: {}" + "\n" + "running stream tasks to close: {}" + "\n" + "suspended stream tasks to close: {}", - clean, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet()); + shutdownType, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet()); super.shutdown(clean); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 72b675794fa..ecc7c507b3e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1110,7 +1110,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // Check if this was a version probing rebalance and check the error code to trigger another rebalance if so if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) { setAssignmentErrorCode(AssignorError.VERSION_PROBING.code()); - return; } // version 1 field diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index f2c75b203c0..a4f1f6a4a90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -68,15 +68,7 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener { if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) { log.debug( "Skipping task creation in rebalance because we are already in {} state.", - streamThread.state() - ); - } else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code()) { - log.debug( - "Encountered assignment error during partition assignment: {}. Skipping task initialization and " - + "pausing any partitions we may have been assigned.", - streamThread.getAssignmentErrorCode() - ); - taskManager.pausePartitions(); + streamThread.state()); } else { // Close non-reassigned tasks before initializing new ones as we may have suspended active // tasks that become standbys or vice versa diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 72cff77febc..4d6dd4df34f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -467,21 +467,22 @@ public class TaskManager { } log.debug("Assigning metadata with: " + - "\tactiveTasks: {},\n" + - "\tstandbyTasks: {}\n" + - "The updated active task states are: \n" + + "\tpreviousAssignedActiveTasks: {},\n" + + "\tpreviousAssignedStandbyTasks: {}\n" + + "The updated task states are: \n" + "\tassignedActiveTasks {},\n" + "\tassignedStandbyTasks {},\n" + "\taddedActiveTasks {},\n" + "\taddedStandbyTasks {},\n" + "\trevokedActiveTasks {},\n" + "\trevokedStandbyTasks {}", - activeTasks, standbyTasks, assignedActiveTasks, assignedStandbyTasks, + activeTasks, standbyTasks, addedActiveTasks, addedStandbyTasks, revokedActiveTasks, revokedStandbyTasks); - this.assignedActiveTasks = activeTasks; - this.assignedStandbyTasks = standbyTasks; + + assignedActiveTasks = activeTasks; + assignedStandbyTasks = standbyTasks; } public void updateSubscriptionsFromAssignment(final List partitions) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 42b1c7c2bc0..261dc3d2d91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -195,7 +195,6 @@ public class StreamsUpgradeTest { if (maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) { setAssignmentErrorCode(AssignorError.VERSION_PROBING.code()); usedSubscriptionMetadataVersionPeek.set(usedSubscriptionMetadataVersion); - return; } final List partitions = new ArrayList<>(assignment.partitions());