Browse Source

KAFKA-10198: guard against recycling dirty state (#8924)

We just needed to add the check in StreamTask#closeClean to closeAndRecycleState as well. I also renamed closeAndRecycleState to closeCleanAndRecycleState to drive this point home: it needs to be clean.

This should be cherry-picked back to the 2.6 branch

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>,
pull/8928/head
A. Sophie Blee-Goldman 4 years ago committed by GitHub
parent
commit
3348fc49d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
  3. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
  4. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  5. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
  6. 6
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
  7. 39
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  8. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java

@ -184,7 +184,7 @@ class ActiveTaskCreator { @@ -184,7 +184,7 @@ class ActiveTaskCreator {
final ProcessorStateManager stateManager = standbyTask.stateMgr;
final LogContext logContext = getLogContext(standbyTask.id);
standbyTask.closeAndRecycleState();
standbyTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
return createActiveTask(

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -189,7 +189,7 @@ public class StandbyTask extends AbstractTask implements Task { @@ -189,7 +189,7 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
stateMgr.recycle();

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java

@ -112,7 +112,7 @@ class StandbyTaskCreator { @@ -112,7 +112,7 @@ class StandbyTaskCreator {
final InternalProcessorContext context = streamTask.processorContext();
final ProcessorStateManager stateManager = streamTask.stateMgr;
streamTask.closeAndRecycleState();
streamTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id()));
return createStandbyTask(

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -463,6 +463,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -463,6 +463,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
@Override
public void closeClean() {
validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
close(true);
log.info("Closed clean");
@ -482,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -482,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
switch (state()) {
case SUSPENDED:
@ -515,17 +517,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -515,17 +517,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
stateMgr.checkpoint(checkpointableOffsets());
}
/**
* You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
*/
private void close(final boolean clean) {
if (clean && commitNeeded) {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
private void validateClean() {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
if (commitNeeded) {
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
+ " commit and should close as dirty instead");
throw new TaskMigratedException("Tried to close dirty task as clean");
}
}
/**
* You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
*/
private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
// first close state manager (which is idempotent) then close the record collector

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java

@ -153,7 +153,7 @@ public interface Task { @@ -153,7 +153,7 @@ public interface Task {
/**
* Attempt a clean close but do not close the underlying state
*/
void closeAndRecycleState();
void closeCleanAndRecycleState();
/**
* Revive a closed task to a created one; should never throw an exception

6
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java

@ -502,13 +502,13 @@ public class StandbyTaskTest { @@ -502,13 +502,13 @@ public class StandbyTaskTest {
EasyMock.replay(stateManager);
task = createStandbyTask();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED
task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
task.suspend();
task.closeAndRecycleState(); // SUSPENDED
task.closeCleanAndRecycleState(); // SUSPENDED
// Currently, there are no metrics registered for standby tasks.
// This is a regression test so that, if we add some, we will be sure to deregister them.

39
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsConfig; @@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
@ -1752,7 +1753,7 @@ public class StreamTaskTest { @@ -1752,7 +1753,7 @@ public class StreamTaskTest {
}
@Test
public void shouldUnregisterMetricsInCloseAndRecycle() {
public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
@ -1761,7 +1762,7 @@ public class StreamTaskTest { @@ -1761,7 +1762,7 @@ public class StreamTaskTest {
task.suspend();
assertThat(getTaskMetrics(), not(empty()));
task.closeAndRecycleState();
task.closeCleanAndRecycleState();
assertThat(getTaskMetrics(), empty());
}
@ -1798,6 +1799,32 @@ public class StreamTaskTest { @@ -1798,6 +1799,32 @@ public class StreamTaskTest {
assertThat(task.inputPartitions(), equalTo(newPartitions));
}
@Test
public void shouldThrowIfCleanClosingDirtyTask() {
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
task.process(0L);
assertTrue(task.commitNeeded());
assertThrows(TaskMigratedException.class, () -> task.closeClean());
}
@Test
public void shouldThrowIfRecyclingDirtyTask() {
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
task.process(0L);
assertTrue(task.commitNeeded());
assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState());
}
@Test
public void shouldOnlyRecycleSuspendedTasks() {
stateManager.recycle();
@ -1805,16 +1832,16 @@ public class StreamTaskTest { @@ -1805,16 +1832,16 @@ public class StreamTaskTest {
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig(false, "100"), true);
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED
task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING
task.completeRestoration();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
task.suspend();
task.closeAndRecycleState(); // SUSPENDED
task.closeCleanAndRecycleState(); // SUSPENDED
EasyMock.verify(stateManager, recordCollector);
}

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -2723,7 +2723,7 @@ public class TaskManagerTest { @@ -2723,7 +2723,7 @@ public class TaskManagerTest {
}
@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
transitionTo(State.CLOSED);
}

Loading…
Cancel
Save