From 3348fc49d8824155e737b866f633e14684da5fe9 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 24 Jun 2020 18:57:38 -0700 Subject: [PATCH] KAFKA-10198: guard against recycling dirty state (#8924) We just needed to add the check in StreamTask#closeClean to closeAndRecycleState as well. I also renamed closeAndRecycleState to closeCleanAndRecycleState to drive this point home: it needs to be clean. This should be cherry-picked back to the 2.6 branch Reviewers: Matthias J. Sax , John Roesler , Guozhang Wang , --- .../internals/ActiveTaskCreator.java | 2 +- .../processor/internals/StandbyTask.java | 2 +- .../internals/StandbyTaskCreator.java | 2 +- .../processor/internals/StreamTask.java | 21 ++++++---- .../streams/processor/internals/Task.java | 2 +- .../processor/internals/StandbyTaskTest.java | 6 +-- .../processor/internals/StreamTaskTest.java | 39 ++++++++++++++++--- .../processor/internals/TaskManagerTest.java | 2 +- 8 files changed, 54 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 0a1f47e139e..012ff20f8b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -184,7 +184,7 @@ class ActiveTaskCreator { final ProcessorStateManager stateManager = standbyTask.stateMgr; final LogContext logContext = getLogContext(standbyTask.id); - standbyTask.closeAndRecycleState(); + standbyTask.closeCleanAndRecycleState(); stateManager.transitionTaskType(TaskType.ACTIVE, logContext); return createActiveTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index ffd09f14059..1aa68bc721f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -189,7 +189,7 @@ public class StandbyTask extends AbstractTask implements Task { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); if (state() == State.SUSPENDED) { stateMgr.recycle(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 443db8e8dd9..b5f2b74f3f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -112,7 +112,7 @@ class StandbyTaskCreator { final InternalProcessorContext context = streamTask.processorContext(); final ProcessorStateManager stateManager = streamTask.stateMgr; - streamTask.closeAndRecycleState(); + streamTask.closeCleanAndRecycleState(); stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id())); return createStandbyTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6e8bf40ab79..4b27436023b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -463,6 +463,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @Override public void closeClean() { + validateClean(); streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(true); log.info("Closed clean"); @@ -482,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { + validateClean(); streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); switch (state()) { case SUSPENDED: @@ -515,17 +517,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, stateMgr.checkpoint(checkpointableOffsets()); } - /** - * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock - */ - private void close(final boolean clean) { - if (clean && commitNeeded) { - // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to - // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty + private void validateClean() { + // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to + // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty + if (commitNeeded) { log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to" + " commit and should close as dirty instead"); throw new TaskMigratedException("Tried to close dirty task as clean"); } + } + + /** + * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock + */ + private void close(final boolean clean) { switch (state()) { case SUSPENDED: // first close state manager (which is idempotent) then close the record collector diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 0200870b7aa..103c2317b63 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -153,7 +153,7 @@ public interface Task { /** * Attempt a clean close but do not close the underlying state */ - void closeAndRecycleState(); + void closeCleanAndRecycleState(); /** * Revive a closed task to a created one; should never throw an exception diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 82f33c4697b..f98d6304e58 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -502,13 +502,13 @@ public class StandbyTaskTest { EasyMock.replay(stateManager); task = createStandbyTask(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED // Currently, there are no metrics registered for standby tasks. // This is a regression test so that, if we add some, we will be sure to deregister them. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c6ffe74e6b8..960747047cf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; @@ -1752,7 +1753,7 @@ public class StreamTaskTest { } @Test - public void shouldUnregisterMetricsInCloseAndRecycle() { + public void shouldUnregisterMetricsInCloseCleanAndRecycleState() { EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); EasyMock.replay(stateManager, recordCollector); @@ -1761,7 +1762,7 @@ public class StreamTaskTest { task.suspend(); assertThat(getTaskMetrics(), not(empty())); - task.closeAndRecycleState(); + task.closeCleanAndRecycleState(); assertThat(getTaskMetrics(), empty()); } @@ -1798,6 +1799,32 @@ public class StreamTaskTest { assertThat(task.inputPartitions(), equalTo(newPartitions)); } + @Test + public void shouldThrowIfCleanClosingDirtyTask() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeClean()); + } + + @Test + public void shouldThrowIfRecyclingDirtyTask() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState()); + } + @Test public void shouldOnlyRecycleSuspendedTasks() { stateManager.recycle(); @@ -1805,16 +1832,16 @@ public class StreamTaskTest { EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig(false, "100"), true); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING task.completeRestoration(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED EasyMock.verify(stateManager, recordCollector); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index fcfbb1f4556..23166d14d08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2723,7 +2723,7 @@ public class TaskManagerTest { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { transitionTo(State.CLOSED); }