Browse Source

KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll (#8682)

As stated, we couldn't wait for handleRebalanceComplete in the case of handleLostAll, as we already closed the active task as dirty, and could potentially require its offset in the next thread.runOnce call.

Co-authored-by: Guozhang Wang <wangguoz@gmail.com>

Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
pull/8651/head
Boyang Chen 5 years ago committed by GitHub
parent
commit
76e0233c93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  2. 20
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -464,6 +464,7 @@ public class TaskManager { @@ -464,6 +464,7 @@ public class TaskManager {
if (task.isActive()) {
closeTaskDirty(task);
iterator.remove();
try {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
} catch (final RuntimeException e) {
@ -516,6 +517,10 @@ public class TaskManager { @@ -516,6 +517,10 @@ public class TaskManager {
* assigned the task as a result of the rebalance). This method should be idempotent.
*/
private void tryToLockAllNonEmptyTaskDirectories() {
// Always clear the set at the beginning as we're always dealing with the
// current set of actually-locked tasks.
lockedTaskDirectories.clear();
for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) {
try {
final TaskId id = TaskId.parse(dir.getName());

20
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -446,7 +446,7 @@ public class TaskManagerTest { @@ -446,7 +446,7 @@ public class TaskManagerTest {
}
@Test
public void shouldCloseActiveTasksWhenHandlingLostTasks() {
public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false);
@ -457,6 +457,17 @@ public class TaskManagerTest { @@ -457,6 +457,17 @@ public class TaskManagerTest {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
makeTaskFolders(taskId00.toString(), taskId01.toString());
expectLockObtainedFor(taskId00, taskId01);
// The second attempt will return empty tasks.
makeTaskFolders();
expectLockObtainedFor();
replay(stateDirectory);
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01)));
// `handleLostAll`
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall();
@ -473,6 +484,13 @@ public class TaskManagerTest { @@ -473,6 +484,13 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01)));
// The locked task map will not be cleared.
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
}
@Test

Loading…
Cancel
Save