Browse Source

KAFKA-10199: Add missing catch for lock exception (#14403)

The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.

In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
pull/14454/merge
Bruno Cadonna 1 year ago committed by GitHub
parent
commit
a46da90b8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  2. 45
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -779,8 +779,7 @@ public class TaskManager { @@ -779,8 +779,7 @@ public class TaskManager {
newTask = task.isActive() ?
convertActiveToStandby((StreamTask) task, inputPartitions) :
convertStandbyToActive((StandbyTask) task, inputPartitions);
newTask.initializeIfNeeded();
stateUpdater.add(newTask);
addTaskToStateUpdater(newTask);
} catch (final RuntimeException e) {
final TaskId taskId = task.id();
final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
@ -843,13 +842,7 @@ public class TaskManager { @@ -843,13 +842,7 @@ public class TaskManager {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
for (final Task task : tasks.drainPendingTasksToInit()) {
try {
task.initializeIfNeeded();
stateUpdater.add(task);
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
addTaskToStateUpdater(task);
} catch (final RuntimeException e) {
// need to add task back to the bookkeeping to be handled by the stream thread
tasks.addTask(task);
@ -860,6 +853,18 @@ public class TaskManager { @@ -860,6 +853,18 @@ public class TaskManager {
maybeThrowTaskExceptions(taskExceptions);
}
private void addTaskToStateUpdater(final Task task) {
try {
task.initializeIfNeeded();
stateUpdater.add(task);
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
}
}
public void handleExceptionsFromStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();

45
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -778,18 +778,57 @@ public class TaskManagerTest { @@ -778,18 +778,57 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException)
.when(task00).initializeIfNeeded();
doThrow(lockException).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Mockito.verify(task00).initializeIfNeeded();
Mockito.verify(task01).initializeIfNeeded();
Mockito.verify(tasks).addPendingTasksToInit(Collections.singleton(task00));
Mockito.verify(tasks).addPendingTasksToInit(
Mockito.argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
Mockito.verify(stateUpdater, never()).add(task00);
Mockito.verify(stateUpdater).add(task01);
}
@Test
public void shouldRetryInitializationWhenLockExceptionAfterRecyclingInStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask task00Converted = standbyTask(taskId00, taskId00Partitions)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions)
.withInputPartitions(taskId01Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions,
consumer)).thenReturn(task01Converted);
when(standbyTaskCreator.createStandbyTaskFromActive(task00, taskId00Partitions))
.thenReturn(task00Converted);
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException).when(task00Converted).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Mockito.verify(task00Converted).initializeIfNeeded();
Mockito.verify(task01Converted).initializeIfNeeded();
Mockito.verify(tasks).addPendingTasksToInit(
Mockito.argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
);
Mockito.verify(stateUpdater, never()).add(task00Converted);
Mockito.verify(stateUpdater).add(task01Converted);
}
@Test
public void shouldRecycleTasksRemovedFromStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)

Loading…
Cancel
Save