diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 8eb024cc670..1838eb7e2aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -243,7 +243,6 @@ public abstract class AbstractTask implements Task { * @param writeCheckpoint boolean indicating if a checkpoint file should be written * @throws ProcessorStateException if there is an error while closing the state manager */ - // visible for testing void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { ProcessorStateException exception = null; log.trace("Closing state manager"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index 697482cebe8..f0019ec9672 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -30,7 +30,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; class AssignedStreamsTasks extends AssignedTasks implements RestoringTasks { private final Map restoring = new HashMap<>(); @@ -46,6 +45,52 @@ class AssignedStreamsTasks extends AssignedTasks implements Restorin return restoringByPartition.get(partition); } + @Override + List allTasks() { + final List tasks = super.allTasks(); + tasks.addAll(restoring.values()); + return tasks; + } + + @Override + Set allAssignedTaskIds() { + final Set taskIds = super.allAssignedTaskIds(); + taskIds.addAll(restoring.keySet()); + return taskIds; + } + + @Override + boolean allTasksRunning() { + return super.allTasksRunning() && restoring.isEmpty(); + } + + RuntimeException closeAllRestoringTasks() { + RuntimeException exception = null; + + log.trace("Closing all restoring stream tasks {}", restoring.keySet()); + final Iterator restoringTaskIterator = restoring.values().iterator(); + while (restoringTaskIterator.hasNext()) { + final StreamTask task = restoringTaskIterator.next(); + log.debug("Closing restoring task {}", task.id()); + try { + task.closeStateManager(true); + } catch (final RuntimeException e) { + log.error("Failed to remove restoring task {} due to the following error:", task.id(), e); + if (exception == null) { + exception = e; + } + } finally { + restoringTaskIterator.remove(); + } + } + + restoring.clear(); + restoredPartitions.clear(); + restoringByPartition.clear(); + + return exception; + } + void updateRestored(final Collection restored) { if (restored.isEmpty()) { return; @@ -86,20 +131,6 @@ class AssignedStreamsTasks extends AssignedTasks implements Restorin } } - @Override - boolean allTasksRunning() { - return super.allTasksRunning() && restoring.isEmpty(); - } - - RuntimeException suspend() { - final AtomicReference firstException = new AtomicReference<>(super.suspend()); - log.trace("Close restoring stream task {}", restoring.keySet()); - firstException.compareAndSet(null, closeNonRunningTasks(restoring.values())); - restoring.clear(); - restoringByPartition.clear(); - return firstException.get(); - } - /** * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) @@ -218,27 +249,6 @@ class AssignedStreamsTasks extends AssignedTasks implements Restorin return punctuated; } - public String toString(final String indent) { - final StringBuilder builder = new StringBuilder(); - builder.append(super.toString(indent)); - describe(builder, restoring.values(), indent, "Restoring:"); - return builder.toString(); - } - - @Override - List allTasks() { - final List tasks = super.allTasks(); - tasks.addAll(restoring.values()); - return tasks; - } - - @Override - Set allAssignedTaskIds() { - final Set taskIds = super.allAssignedTaskIds(); - taskIds.addAll(restoring.keySet()); - return taskIds; - } - void clear() { super.clear(); restoring.clear(); @@ -246,6 +256,13 @@ class AssignedStreamsTasks extends AssignedTasks implements Restorin restoredPartitions.clear(); } + public String toString(final String indent) { + final StringBuilder builder = new StringBuilder(); + builder.append(super.toString(indent)); + describe(builder, restoring.values(), indent, "Restoring:"); + return builder.toString(); + } + // for testing only Collection restoringTasks() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index a33ecdc64d2..a9baa3f15cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -84,8 +84,7 @@ abstract class AssignedTasks { } boolean allTasksRunning() { - return created.isEmpty() - && suspended.isEmpty(); + return created.isEmpty() && suspended.isEmpty(); } Collection running() { @@ -106,7 +105,7 @@ abstract class AssignedTasks { return firstException.get(); } - RuntimeException closeNonRunningTasks(final Collection tasks) { + private RuntimeException closeNonRunningTasks(final Collection tasks) { RuntimeException exception = null; for (final T task : tasks) { try { @@ -167,7 +166,7 @@ abstract class AssignedTasks { boolean maybeResumeSuspendedTask(final TaskId taskId, final Set partitions) { if (suspended.containsKey(taskId)) { final T task = suspended.get(taskId); - log.trace("found suspended {} {}", taskTypeName, taskId); + log.trace("Found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); task.resume(); @@ -185,10 +184,10 @@ abstract class AssignedTasks { } throw e; } - log.trace("resuming suspended {} {}", taskTypeName, task.id()); + log.trace("Resuming suspended {} {}", taskTypeName, task.id()); return true; } else { - log.warn("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); + log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); } } return false; @@ -198,7 +197,7 @@ abstract class AssignedTasks { * @throws TaskMigratedException if the task producer got fenced (EOS only) */ void transitionToRunning(final T task) { - log.debug("transitioning {} {} to running", taskTypeName, task.id()); + log.debug("Transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); task.initializeTopology(); for (final TopicPartition topicPartition : task.partitions()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index fdd9d6c303c..59fb36fa6a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -277,6 +277,7 @@ public class StoreChangelogReader implements ChangelogReader { needsRestoring.clear(); endOffsets.clear(); needsInitializing.clear(); + completedRestorers.clear(); } private long processNext(final List> records, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 9cc5a19e017..455d226d69c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -99,11 +99,11 @@ public class TaskManager { throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); } - changelogReader.reset(); // do this first as we may have suspended standby tasks that // will become active or vice versa standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks); active.closeNonAssignedSuspendedTasks(assignedActiveTasks); + addStreamTasks(assignment); addStandbyTasks(); // Pause all the partitions until the underlying state store is ready for all the active tasks. @@ -240,7 +240,14 @@ public class TaskManager { final AtomicReference firstException = new AtomicReference<>(null); firstException.compareAndSet(null, active.suspend()); + // close all restoring tasks as well and then reset changelog reader; + // for those restoring and still assigned tasks, they will be re-created + // in addStreamTasks. + firstException.compareAndSet(null, active.closeAllRestoringTasks()); + changelogReader.reset(); + firstException.compareAndSet(null, standby.suspend()); + // remove the changelog partitions from restore consumer restoreConsumer.unsubscribe(); @@ -368,7 +375,7 @@ public class TaskManager { } public void setAssignmentMetadata(final Map> activeTasks, - final Map> standbyTasks) { + final Map> standbyTasks) { this.assignedActiveTasks = activeTasks; this.assignedStandbyTasks = standbyTasks; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index fe71135f223..ffd0f8baae6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -146,11 +146,13 @@ public class AssignedStreamsTasksTest { EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()); - t1.close(false, false); + t1.closeStateManager(true); EasyMock.expectLastCall(); EasyMock.replay(t1); - assertThat(suspendTask(), nullValue()); + addAndInitTask(); + assertThat(assignedTasks.closeAllRestoringTasks(), nullValue()); + EasyMock.verify(t1); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 00af1002578..f71d7a134eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -260,18 +260,6 @@ public class TaskManagerTest { verify(active); } - @Test - public void shouldResetChangeLogReaderOnCreateTasks() { - mockSingleActiveTask(); - changeLogReader.reset(); - EasyMock.expectLastCall(); - replay(); - - taskManager.setAssignmentMetadata(taskId0Assignment, Collections.>emptyMap()); - taskManager.createTasks(taskId0Partitions); - verify(changeLogReader); - } - @Test public void shouldAddNonResumedActiveTasks() { mockSingleActiveTask();