diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index 63dac25a9d0..161714e34cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -281,7 +281,7 @@ class AssignedStreamsTasks extends AssignedTasks implements Restorin firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs)); } else if (restoring.containsKey(id)) { log.debug("Closing the zombie restoring stream task {}.", id); - firstException.compareAndSet(null, closeRestoring(true, created.get(id), lostTaskChangelogs)); + firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), lostTaskChangelogs)); } else if (running.containsKey(id)) { log.debug("Closing the zombie running stream task {}.", id); firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index 68ca9bd186c..42dc58ba628 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -41,6 +41,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import org.junit.function.ThrowingRunnable; @@ -49,6 +50,7 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; @@ -517,6 +519,147 @@ public class AssignedStreamsTasksTest { assignedTasks.shutdown(true); } + @Test + public void shouldClearZombieCreatedTasks() { + new TaskTestSuite() { + @Override + public void additionalSetup(final StreamTask task) { + task.close(false, true); + } + + @Override + public void action(final StreamTask task) { + assignedTasks.addNewTask(task); + } + + @Override + public Set taskIds() { + return assignedTasks.created.keySet(); + } + + @Override + public List expectedLostChangelogs() { + return clearingPartitions; + } + }.createTaskAndClear(); + } + + @Test + public void shouldClearZombieRestoringTasks() { + new TaskTestSuite() { + @Override + public void additionalSetup(final StreamTask task) { + EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes(); + task.closeStateManager(false); + } + + @Override + public void action(final StreamTask task) { + assignedTasks.addTaskToRestoring(task); + } + + @Override + public Set taskIds() { + return assignedTasks.restoringTaskIds(); + } + + @Override + public List expectedLostChangelogs() { + return clearingPartitions; + } + }.createTaskAndClear(); + } + + @Test + public void shouldClearZombieRunningTasks() { + new TaskTestSuite() { + @Override + public void additionalSetup(final StreamTask task) { + task.initializeTopology(); + EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes(); + task.close(false, true); + } + + @Override + public void action(final StreamTask task) { + assignedTasks.transitionToRunning(task); + } + + @Override + public Set taskIds() { + return assignedTasks.runningTaskIds(); + } + + @Override + public List expectedLostChangelogs() { + return clearingPartitions; + } + }.createTaskAndClear(); + } + + @Test + public void shouldClearZombieSuspendedTasks() { + new TaskTestSuite() { + @Override + public void additionalSetup(final StreamTask task) { + task.initializeTopology(); + EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes(); + task.suspend(); + task.closeSuspended(false, null); + } + + @Override + public void action(final StreamTask task) { + assignedTasks.transitionToRunning(task); + final List revokedChangelogs = new ArrayList<>(); + final List ids = Collections.singletonList(task.id()); + assignedTasks.suspendOrCloseTasks(new HashSet<>(ids), revokedChangelogs); + assertEquals(clearingPartitions, revokedChangelogs); + } + + @Override + public Set taskIds() { + return assignedTasks.suspendedTaskIds(); + } + + @Override + public List expectedLostChangelogs() { + return Collections.emptyList(); + } + }.createTaskAndClear(); + } + + abstract class TaskTestSuite { + + TaskId clearingTaskId = new TaskId(0, 0); + List clearingPartitions = Collections.singletonList(new TopicPartition("topic", 0)); + + abstract void additionalSetup(final StreamTask task); + + abstract void action(final StreamTask task); + + abstract Set taskIds(); + + abstract List expectedLostChangelogs(); + + void createTaskAndClear() { + final StreamTask task = EasyMock.createMock(StreamTask.class); + EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes(); + EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes(); + additionalSetup(task); + EasyMock.replay(task); + + action(task); + final List changelogs = new ArrayList<>(); + final Set ids = new HashSet<>(Collections.singleton(task.id())); + assertEquals(ids, taskIds()); + + assignedTasks.closeZombieTasks(ids, changelogs); + assertEquals(Collections.emptySet(), taskIds()); + assertEquals(expectedLostChangelogs(), changelogs); + } + } + private void addAndInitTask() { assignedTasks.addNewTask(t1); assignedTasks.initializeNewTasks();