Browse Source

KAFKA-10199: Fix restoration behavior for paused tasks (#14437)

State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.

Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.

Reviewer: Bruno Cadonna <cadonna@apache.org>
pull/14454/merge
Lucas Brutschy 1 year ago committed by GitHub
parent
commit
2d04370bca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
  2. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  3. 26
      streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

14
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

@ -73,6 +73,7 @@ public class DefaultStateUpdater implements StateUpdater { @@ -73,6 +73,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final ChangelogReader changelogReader;
private final StateUpdaterMetrics updaterMetrics;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final AtomicBoolean isIdle = new AtomicBoolean(false);
private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>();
@ -307,10 +308,10 @@ public class DefaultStateUpdater implements StateUpdater { @@ -307,10 +308,10 @@ public class DefaultStateUpdater implements StateUpdater {
tasksAndActionsLock.lock();
try {
while (isRunning.get() &&
changelogReader.allChangelogsCompleted() &&
(changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty()) &&
tasksAndActions.isEmpty() &&
!isTopologyResumed.get()) {
isIdle.set(true);
tasksAndActionsCondition.await();
}
} catch (final InterruptedException ignored) {
@ -318,6 +319,7 @@ public class DefaultStateUpdater implements StateUpdater { @@ -318,6 +319,7 @@ public class DefaultStateUpdater implements StateUpdater {
// and hence this exception should never be thrown
} finally {
tasksAndActionsLock.unlock();
isIdle.set(false);
}
}
@ -768,6 +770,14 @@ public class DefaultStateUpdater implements StateUpdater { @@ -768,6 +770,14 @@ public class DefaultStateUpdater implements StateUpdater {
);
}
// used for testing
boolean isIdle() {
if (stateUpdaterThread != null) {
return stateUpdaterThread.isIdle.get();
}
return false;
}
private <T> Set<T> executeWithQueuesLocked(final Supplier<Set<T>> action) {
tasksAndActionsLock.lock();
restoredActiveTasksLock.lock();

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -808,15 +808,15 @@ public class StoreChangelogReader implements ChangelogReader { @@ -808,15 +808,15 @@ public class StoreChangelogReader implements ChangelogReader {
private void initializeChangelogs(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata> newPartitionsToRestore) {
if (newPartitionsToRestore.isEmpty()) {
return;
}
// for those changelog partitions whose tasks are not included, in means those tasks
// are paused at the moment, and hence we should not try to initialize those
// changelogs yet
filterNewPartitionsToRestore(tasks, newPartitionsToRestore);
if (newPartitionsToRestore.isEmpty()) {
return;
}
// for active changelogs, we need to find their end offset before transit to restoring
// if the changelog is on source topic, then its end offset should be the minimum of
// its committed offset and its end offset; for standby tasks that use source topics

26
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

@ -1043,6 +1043,24 @@ class DefaultStateUpdaterTest { @@ -1043,6 +1043,24 @@ class DefaultStateUpdaterTest {
verify(changelogReader, times(2)).enforceRestoreActive();
}
@Test
public void shouldIdleWhenAllTasksPaused() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
stateUpdater.start();
stateUpdater.add(task);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(task);
verifyIdle();
when(topologyMetadata.isPaused(null)).thenReturn(false);
stateUpdater.signalResume();
verifyPausedTasks();
verifyUpdatingTasks(task);
}
@Test
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
@ -1767,6 +1785,14 @@ class DefaultStateUpdaterTest { @@ -1767,6 +1785,14 @@ class DefaultStateUpdaterTest {
}
}
private void verifyIdle() throws Exception {
waitForCondition(
() -> stateUpdater.isIdle(),
VERIFICATION_TIMEOUT,
"State updater did not enter an idling state!"
);
}
private void verifyPausedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(

Loading…
Cancel
Save