From 76e0233c93db0301832a7d22ddbcb58f05319f30 Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Tue, 19 May 2020 14:00:13 -0700 Subject: [PATCH] KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll (#8682) As stated, we couldn't wait for handleRebalanceComplete in the case of handleLostAll, as we already closed the active task as dirty, and could potentially require its offset in the next thread.runOnce call. Co-authored-by: Guozhang Wang Reviewers: A. Sophie Blee-Goldman , Guozhang Wang --- .../processor/internals/TaskManager.java | 5 +++++ .../processor/internals/TaskManagerTest.java | 20 ++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 28945f784a4..c697dc1d97f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -464,6 +464,7 @@ public class TaskManager { if (task.isActive()) { closeTaskDirty(task); iterator.remove(); + try { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); } catch (final RuntimeException e) { @@ -516,6 +517,10 @@ public class TaskManager { * assigned the task as a result of the rebalance). This method should be idempotent. */ private void tryToLockAllNonEmptyTaskDirectories() { + // Always clear the set at the beginning as we're always dealing with the + // current set of actually-locked tasks. + lockedTaskDirectories.clear(); + for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) { try { final TaskId id = TaskId.parse(dir.getName()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 941636e7bd1..f8c7deea18f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -446,7 +446,7 @@ public class TaskManagerTest { } @Test - public void shouldCloseActiveTasksWhenHandlingLostTasks() { + public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false); @@ -457,6 +457,17 @@ public class TaskManagerTest { topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); expectLastCall().anyTimes(); + makeTaskFolders(taskId00.toString(), taskId01.toString()); + expectLockObtainedFor(taskId00, taskId01); + + // The second attempt will return empty tasks. + makeTaskFolders(); + expectLockObtainedFor(); + replay(stateDirectory); + + taskManager.handleRebalanceStart(emptySet()); + assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01))); + // `handleLostAll` activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall(); @@ -473,6 +484,13 @@ public class TaskManagerTest { assertThat(task01.state(), is(Task.State.RUNNING)); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01))); + + // The locked task map will not be cleared. + assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01))); + + taskManager.handleRebalanceStart(emptySet()); + + assertThat(taskManager.lockedTaskDirectories(), is(emptySet())); } @Test