Browse Source

KAFKA-15326: [7/N] Processing thread non-busy waiting (#14180)

Avoid busy waiting for processable tasks. We need to be a bit careful here to not have the task executors to sleep when work is available. We have to make sure to signal on the condition variable any time a task becomes "processable". Here are some situations where a task becomes processable:

- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.

So in summary, we

- We should probably lock tasks when they are paused and unlock them when they are resumed. We should also wake the task executors after every polling phase. This belongs to the StreamThread integration work (separate PR). We add DefaultTaskManager.signalProcessableTasks for this.
- We need to awake the task executors in DefaultTaskManager.unassignTask, DefaultTaskManager.unlockTasks and DefaultTaskManager.add.


Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@apache.org>
pull/14372/head
Lucas Brutschy 1 year ago committed by GitHub
parent
commit
07a18478be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      gradle/spotbugs-exclude.xml
  2. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
  3. 53
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
  4. 12
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
  5. 10
      streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
  6. 123
      streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java

6
gradle/spotbugs-exclude.xml

@ -530,4 +530,10 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read @@ -530,4 +530,10 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="DMI_RANDOM_USED_ONLY_ONCE"/>
</Match>
<Match>
<!-- Suppress a warning about await not being in a loop - we expect the loop to be outside the method. -->
<Class name="org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>
</FindBugsFilter>

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java

@ -94,7 +94,13 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -94,7 +94,13 @@ public class DefaultTaskExecutor implements TaskExecutor {
currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this);
}
if (currentTask != null) {
if (currentTask == null) {
try {
taskManager.awaitProcessableTasks();
} catch (final InterruptedException ignored) {
// Can be ignored, the cause of the interrupted will be handled in the event loop
}
} else {
boolean progressed = false;
if (taskExecutionMetadata.canProcessTask(currentTask, nowMs) && currentTask.isProcessable(nowMs)) {

53
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
import java.util.concurrent.locks.Condition;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
@ -56,6 +57,7 @@ public class DefaultTaskManager implements TaskManager { @@ -56,6 +57,7 @@ public class DefaultTaskManager implements TaskManager {
private final TasksRegistry tasks;
private final Lock tasksLock = new ReentrantLock();
private final Condition tasksCondition = tasksLock.newCondition();
private final List<TaskId> lockedTasks = new ArrayList<>();
private final Map<TaskId, StreamsException> uncaughtExceptions = new HashMap<>();
private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();
@ -108,7 +110,7 @@ public class DefaultTaskManager implements TaskManager { @@ -108,7 +110,7 @@ public class DefaultTaskManager implements TaskManager {
assignedTasks.put(task.id(), executor);
log.info("Assigned {} to executor {}", task.id(), executor.name());
log.debug("Assigned task {} to executor {}", task.id(), executor.name());
return (StreamTask) task;
}
@ -118,6 +120,41 @@ public class DefaultTaskManager implements TaskManager { @@ -118,6 +120,41 @@ public class DefaultTaskManager implements TaskManager {
});
}
@Override
public void awaitProcessableTasks() throws InterruptedException {
final boolean interrupted = returnWithTasksLocked(() -> {
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds())
) {
log.debug("Await unblocked: returning early from await since a processable task {} was found", task.id());
return false;
}
}
try {
log.debug("Await blocking");
tasksCondition.await();
} catch (final InterruptedException ignored) {
// we interrupt the thread for shut down and pause.
// we can ignore this exception.
log.debug("Await unblocked: Interrupted while waiting for processable tasks");
return true;
}
log.debug("Await unblocked: Woken up to check for processable tasks");
return false;
});
if (interrupted) {
throw new InterruptedException();
}
}
public void signalProcessableTasks() {
log.debug("Waking up task executors");
executeWithTasksLocked(tasksCondition::signalAll);
}
@Override
public void unassignTask(final StreamTask task, final TaskExecutor executor) {
executeWithTasksLocked(() -> {
@ -132,7 +169,8 @@ public class DefaultTaskManager implements TaskManager { @@ -132,7 +169,8 @@ public class DefaultTaskManager implements TaskManager {
assignedTasks.remove(task.id());
log.info("Unassigned {} from executor {}", task.id(), executor.name());
log.debug("Unassigned {} from executor {}", task.id(), executor.name());
tasksCondition.signalAll();
});
}
@ -188,7 +226,11 @@ public class DefaultTaskManager implements TaskManager { @@ -188,7 +226,11 @@ public class DefaultTaskManager implements TaskManager {
@Override
public void unlockTasks(final Set<TaskId> taskIds) {
executeWithTasksLocked(() -> lockedTasks.removeAll(taskIds));
executeWithTasksLocked(() -> {
lockedTasks.removeAll(taskIds);
log.debug("Waking up task executors");
tasksCondition.signalAll();
});
}
@Override
@ -202,6 +244,8 @@ public class DefaultTaskManager implements TaskManager { @@ -202,6 +244,8 @@ public class DefaultTaskManager implements TaskManager {
for (final StreamTask task : tasksToAdd) {
tasks.addTask(task);
}
log.debug("Waking up task executors");
tasksCondition.signalAll();
});
log.info("Added tasks {} to the task manager to process", tasksToAdd);
@ -261,12 +305,11 @@ public class DefaultTaskManager implements TaskManager { @@ -261,12 +305,11 @@ public class DefaultTaskManager implements TaskManager {
return result;
});
log.info("Drained {} uncaught exceptions", returnValue.size());
log.debug("Drained {} uncaught exceptions", returnValue.size());
return returnValue;
}
private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {

12
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java

@ -72,8 +72,6 @@ public interface TaskManager { @@ -72,8 +72,6 @@ public interface TaskManager {
/**
* Unlock all of the managed active tasks from the task manager. Similar to {@link #unlockTasks(Set)}.
*
* This method does not block, instead a future is returned.
*/
void unlockAllTasks();
@ -121,4 +119,14 @@ public interface TaskManager { @@ -121,4 +119,14 @@ public interface TaskManager {
*/
Map<TaskId, StreamsException> drainUncaughtExceptions();
/**
* Signals that at least one task has become processable, e.g. because it was resumed or new records may be available.
*/
void signalProcessableTasks();
/**
* Blocks until unassigned processable tasks may be available.
*/
void awaitProcessableTasks() throws InterruptedException;
}

10
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java

@ -85,6 +85,16 @@ public class DefaultTaskExecutorTest { @@ -85,6 +85,16 @@ public class DefaultTaskExecutorTest {
assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown");
}
@Test
public void shouldAwaitProcessableTasksIfNoneAssignable() throws InterruptedException {
assertNull(taskExecutor.currentTask(), "Have task assigned before startup");
when(taskManager.assignNextTask(taskExecutor)).thenReturn(null);
taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).awaitProcessableTasks();
}
@Test
public void shouldUnassignTaskWhenNotProgressing() {
when(task.isProcessable(anyLong())).thenReturn(false);

123
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java

@ -16,6 +16,9 @@ @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
@ -26,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId; @@ -26,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.apache.kafka.test.StreamsTestUtils.TaskBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -37,6 +41,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; @@ -37,6 +41,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -44,13 +49,17 @@ import static org.mockito.ArgumentMatchers.anyLong; @@ -44,13 +49,17 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DefaultTaskManagerTest {
private final static long VERIFICATION_TIMEOUT = 15000;
private final Time time = new MockTime(1L);
private final StreamTask task = mock(StreamTask.class);
private final TaskId taskId = new TaskId(0, 0, "A");
private final StreamTask task = TaskBuilder.statelessTask(taskId).build();
private final TasksRegistry tasks = mock(TasksRegistry.class);
private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
private final StreamsException exception = mock(StreamsException.class);
@ -70,9 +79,9 @@ public class DefaultTaskManagerTest { @@ -70,9 +79,9 @@ public class DefaultTaskManagerTest {
@BeforeEach
public void setUp() {
when(task.id()).thenReturn(new TaskId(0, 0, "A"));
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.isActive()).thenReturn(true);
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(tasks.task(taskId)).thenReturn(task);
}
@Test
@ -94,6 +103,114 @@ public class DefaultTaskManagerTest { @@ -94,6 +103,114 @@ public class DefaultTaskManagerTest {
assertNull(taskManager.assignNextTask(taskExecutor));
}
private class AwaitingRunnable implements Runnable {
private final CountDownLatch awaitDone = new CountDownLatch(1);
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@Override
public void run() {
while (!shutdownRequested.get()) {
try {
taskManager.awaitProcessableTasks();
} catch (final InterruptedException ignored) {
}
awaitDone.countDown();
}
}
public void shutdown() {
shutdownRequested.set(true);
taskManager.signalProcessableTasks();
}
}
@Test
public void shouldBlockOnAwait() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
assertFalse(awaitingRunnable.awaitDone.await(100, TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
}
@Test
public void shouldReturnFromAwaitOnInterruption() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
awaitingThread.interrupt();
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
}
@Test
public void shouldReturnFromAwaitOnSignalProcessableTasks() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
taskManager.signalProcessableTasks();
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
}
@Test
public void shouldReturnFromAwaitOnUnassignment() throws InterruptedException {
taskManager.add(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);
final StreamTask task = taskManager.assignNextTask(taskExecutor);
assertNotNull(task);
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
taskManager.unassignTask(task, taskExecutor);
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
}
@Test
public void shouldReturnFromAwaitOnAdding() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
taskManager.add(Collections.singleton(task));
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
}
@Test
public void shouldReturnFromAwaitOnUnlocking() throws InterruptedException {
taskManager.add(Collections.singleton(task));
taskManager.lockTasks(Collections.singleton(task.id()));
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
taskManager.unlockAllTasks();
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
}
@Test
public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
taskManager.add(Collections.singleton(task));

Loading…
Cancel
Save