Browse Source

MINOR: Only commit running active and standby tasks when tasks corrupted (#14508)

When tasks are found corrupted, Kafka Streams tries to commit
the non-corrupted tasks before closing and reviving the corrupted
active tasks. Besides active running tasks, Kafka Streams tries
to commit restoring active tasks and standby tasks. However,
restoring active tasks do not need to be committed since they
do not have offsets to commit and the current code does not
write a checkpoint. Furthermore, trying to commit restoring
active tasks with the state updater enabled results in the
following error:

java.lang.UnsupportedOperationException: This task is read-only
at org.apache.kafka.streams.processor.internals.ReadOnlyTask.commitNeeded(ReadOnlyTask.java:209)
...

since commitNeeded() is not a read-only method for active tasks.

In future, we can consider writing a checkpoint for active
restoring tasks in this situation. Additionally, we should
fix commitNeeded() in active tasks to be read-only.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
pull/12095/merge
Bruno Cadonna 1 year ago committed by GitHub
parent
commit
c7f730d9d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
  2. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  3. 26
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
  4. 26
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java

@ -206,8 +206,11 @@ public class ReadOnlyTask implements Task { @@ -206,8 +206,11 @@ public class ReadOnlyTask implements Task {
@Override
public boolean commitNeeded() {
if (task.isActive()) {
throw new UnsupportedOperationException("This task is read-only");
}
return task.commitNeeded();
}
@Override
public StateStore getStore(final String name) {

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -223,10 +223,7 @@ public class TaskManager { @@ -223,10 +223,7 @@ public class TaskManager {
final Collection<Task> tasksToCommit = allTasks()
.values()
.stream()
// TODO: once we remove state restoration from the stream thread, we can also remove
// the RESTORING state here, since there will not be any restoring tasks managed
// by the stream thread anymore.
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> t.state() == Task.State.RUNNING)
.filter(t -> !corruptedTasks.contains(t.id()))
.collect(Collectors.toSet());
commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, new HashMap<>());

26
streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java

@ -28,6 +28,9 @@ import java.util.LinkedList; @@ -28,6 +28,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -41,6 +44,7 @@ class ReadOnlyTaskTest { @@ -41,6 +44,7 @@ class ReadOnlyTaskTest {
add("inputPartitions");
add("changelogPartitions");
add("commitRequested");
add("commitNeeded");
add("isActive");
add("changelogOffsets");
add("state");
@ -126,6 +130,28 @@ class ReadOnlyTaskTest { @@ -126,6 +130,28 @@ class ReadOnlyTaskTest {
verify(task).state();
}
@Test
public void shouldDelegateCommitNeededIfStandby() {
final StandbyTask standbyTask =
standbyTask(new TaskId(1, 0), mkSet(new TopicPartition("topic", 0))).build();
final ReadOnlyTask readOnlyTask = new ReadOnlyTask(standbyTask);
readOnlyTask.commitNeeded();
verify(standbyTask).commitNeeded();
}
@Test
public void shouldThrowUnsupportedOperationExceptionForCommitNeededIfActive() {
final StreamTask statefulTask =
statefulTask(new TaskId(1, 0), mkSet(new TopicPartition("topic", 0))).build();
final ReadOnlyTask readOnlyTask = new ReadOnlyTask(statefulTask);
final Exception exception = assertThrows(UnsupportedOperationException.class, readOnlyTask::commitNeeded);
assertEquals("This task is read-only", exception.getMessage());
}
@Test
public void shouldThrowUnsupportedOperationExceptionForForbiddenMethods() {
final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);

26
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -95,6 +95,7 @@ import static java.util.Collections.emptySet; @@ -95,6 +95,7 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.intersection;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
@ -2330,6 +2331,31 @@ public class TaskManagerTest { @@ -2330,6 +2331,31 @@ public class TaskManagerTest {
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitStandbyTasks() {
final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask)));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeRestoringTask, standbyTask));
expect(consumer.assignment()).andReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
replay(consumer);
taskManager.handleCorruption(mkSet(taskId02));
Mockito.verify(activeRestoringTask, never()).commitNeeded();
Mockito.verify(standbyTask, times(2)).commitNeeded();
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

Loading…
Cancel
Save