Browse Source

KAFKA-12679: Handle lock exceptions in state updater (#12875)

In this change, we enable backing off when the state directory
is still locked during initialization of a task. When the state
directory is locked, the task is reinserted into the
initialization queue. We will reattempt to acquire the lock
after the next round of polling.

Tested through a new unit test.

Reviewer: Bruno Cadonna <cadonna@apache.org>
pull/12916/head
Lucas Brutschy 2 years ago committed by GitHub
parent
commit
9ea3d0d1c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
  2. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  3. 23
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java

@ -142,6 +142,8 @@ final class StateManagerUtil { @@ -142,6 +142,8 @@ final class StateManagerUtil {
stateDirectory.unlock(id);
}
}
} else {
log.error("Failed to acquire lock while closing the state store for {} task {}", taskType, id);
}
} catch (final IOException e) {
final ProcessorStateException exception = new ProcessorStateException(

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -833,6 +833,11 @@ public class TaskManager { @@ -833,6 +833,11 @@ public class TaskManager {
try {
task.initializeIfNeeded();
stateUpdater.add(task);
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTaskToInit(Collections.singleton(task));
} catch (final RuntimeException e) {
// need to add task back to the bookkeeping to be handled by the stream thread
tasks.addTask(task);

23
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -770,6 +770,29 @@ public class TaskManagerTest { @@ -770,6 +770,29 @@ public class TaskManagerTest {
Mockito.verify(stateUpdater).add(task01);
}
@Test
public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTaskToInit()).thenReturn(mkSet(task00, task01));
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException)
.when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Mockito.verify(task00).initializeIfNeeded();
Mockito.verify(task01).initializeIfNeeded();
Mockito.verify(tasks).addPendingTaskToInit(Collections.singleton(task00));
Mockito.verify(stateUpdater).add(task01);
}
@Test
public void shouldRecycleTasksRemovedFromStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)

Loading…
Cancel
Save