Browse Source

KAFKA-15326: [9/N] Start and stop executors and cornercases (#14281)

* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager

Reviewers: Bruno Cadonna <bruno@confluent.io>
pull/14514/head
Lucas Brutschy 1 year ago committed by GitHub
parent
commit
6263197a62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 9
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  3. 90
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
  4. 66
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
  5. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
  6. 27
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
  7. 13
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  8. 50
      streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
  9. 101
      streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java

2
checkstyle/suppressions.xml

@ -235,7 +235,7 @@ @@ -235,7 +235,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
<suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>

9
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -394,6 +394,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -394,6 +394,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
timeCurrentIdlingStarted = Optional.empty();
}
public void flush() {
stateMgr.flushCache();
recordCollector.flush();
}
/**
* @throws StreamsException fatal error that should cause the thread to die
* @throws TaskMigratedException recoverable error that would cause the task to be removed
@ -414,8 +420,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -414,8 +420,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// cached records to be processed and hence generate more records to be sent out
//
// TODO: this should be removed after we decouple caching with emitting
stateMgr.flushCache();
recordCollector.flush();
flush();
hasPendingTxCommit = eosEnabled;
log.debug("Prepared {} task for committing", state());

90
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java

@ -41,8 +41,8 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -41,8 +41,8 @@ public class DefaultTaskExecutor implements TaskExecutor {
private class TaskExecutorThread extends Thread {
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final AtomicReference<KafkaFutureImpl<StreamTask>> pauseRequested = new AtomicReference<>(null);
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicReference<KafkaFutureImpl<StreamTask>> taskReleaseRequested = new AtomicReference<>(null);
private final Logger log;
@ -57,26 +57,52 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -57,26 +57,52 @@ public class DefaultTaskExecutor implements TaskExecutor {
public void run() {
log.info("Task executor thread started");
try {
while (isRunning.get()) {
runOnce(time.milliseconds());
while (!shutdownRequested.get()) {
try {
runOnce(time.milliseconds());
} catch (final StreamsException e) {
handleException(e);
} catch (final Exception e) {
handleException(new StreamsException(e));
}
}
} catch (final StreamsException e) {
handleException(e);
} catch (final Exception e) {
handleException(new StreamsException(e));
} finally {
if (currentTask != null) {
log.debug("Releasing task {} due to shutdown.", currentTask.id());
unassignCurrentTask();
}
shutdownGate.countDown();
final KafkaFutureImpl<StreamTask> taskReleaseFuture;
if ((taskReleaseFuture = taskReleaseRequested.getAndSet(null)) != null) {
log.debug("Asked to return current task, but shutting down.");
taskReleaseFuture.complete(null);
}
log.info("Task executor thread shutdown");
}
}
private void handleTaskReleaseRequested() {
final KafkaFutureImpl<StreamTask> taskReleaseFuture;
if ((taskReleaseFuture = taskReleaseRequested.getAndSet(null)) != null) {
if (currentTask != null) {
log.debug("Releasing task {} upon request.", currentTask.id());
final StreamTask unassignedTask = unassignCurrentTask();
taskReleaseFuture.complete(unassignedTask);
} else {
log.debug("Asked to return current task, but returned current task already.");
taskReleaseFuture.complete(null);
}
}
}
private void handleException(final StreamsException e) {
if (currentTask != null) {
taskManager.setUncaughtException(e, currentTask.id());
log.debug("Releasing task {} due to uncaught exception.", currentTask.id());
unassignCurrentTask();
} else {
// If we do not currently have a task assigned and still get an error, this is fatal for the executor thread
throw e;
@ -84,11 +110,7 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -84,11 +110,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
private void runOnce(final long nowMs) {
final KafkaFutureImpl<StreamTask> pauseFuture;
if ((pauseFuture = pauseRequested.getAndSet(null)) != null) {
final StreamTask unassignedTask = unassignCurrentTask();
pauseFuture.complete(unassignedTask);
}
handleTaskReleaseRequested();
if (currentTask == null) {
currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this);
@ -105,6 +127,7 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -105,6 +127,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
if (taskExecutionMetadata.canProcessTask(currentTask, nowMs) && currentTask.isProcessable(nowMs)) {
if (processTask(currentTask, nowMs, time)) {
log.trace("processed a record for {}", currentTask.id());
progressed = true;
}
}
@ -121,6 +144,7 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -121,6 +144,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
if (!progressed) {
log.debug("Releasing task {} because we are not making progress.", currentTask.id());
unassignCurrentTask();
}
}
@ -131,10 +155,10 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -131,10 +155,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
try {
processed = task.process(now);
if (processed) {
log.trace("Successfully processed task {}", task.id());
task.clearTaskTimeout();
// TODO: enable regardless of whether using named topologies
if (taskExecutionMetadata.hasNamedTopologies() && taskExecutionMetadata.processingMode() != EXACTLY_ONCE_V2) {
log.trace("Successfully processed task {}", task.id());
taskExecutionMetadata.addToSuccessfullyProcessed(task);
}
}
@ -168,9 +192,10 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -168,9 +192,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
if (currentTask == null)
throw new IllegalStateException("Does not own any task while being ask to unassign from task manager");
// flush the task before giving it back to task manager
// TODO: we can add a separate function in StreamTask to just flush and not return offsets
currentTask.prepareCommit();
// flush the task before giving it back to task manager, if we are not handing it back because of an error.
if (!taskManager.hasUncaughtException(currentTask.id())) {
currentTask.flush();
}
taskManager.unassignTask(currentTask, DefaultTaskExecutor.this);
final StreamTask retTask = currentTask;
@ -183,6 +208,7 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -183,6 +208,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
private final String name;
private final TaskManager taskManager;
private final TaskExecutionMetadata taskExecutionMetadata;
private final Logger log;
private StreamTask currentTask = null;
private TaskExecutorThread taskExecutorThread = null;
@ -196,6 +222,8 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -196,6 +222,8 @@ public class DefaultTaskExecutor implements TaskExecutor {
this.name = name;
this.taskManager = taskManager;
this.taskExecutionMetadata = taskExecutionMetadata;
final LogContext logContext = new LogContext(name);
this.log = logContext.logger(DefaultTaskExecutor.class);
}
@Override
@ -213,10 +241,20 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -213,10 +241,20 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
@Override
public void shutdown(final Duration timeout) {
public boolean isRunning() {
return taskExecutorThread != null && taskExecutorThread.isAlive() && shutdownGate.getCount() != 0;
}
@Override
public void requestShutdown() {
if (taskExecutorThread != null) {
taskExecutorThread.shutdownRequested.set(true);
}
}
@Override
public void awaitShutdown(final Duration timeout) {
if (taskExecutorThread != null) {
taskExecutorThread.isRunning.set(false);
taskExecutorThread.interrupt();
try {
if (!shutdownGate.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
throw new StreamsException("State updater thread did not shutdown within the timeout");
@ -237,8 +275,18 @@ public class DefaultTaskExecutor implements TaskExecutor { @@ -237,8 +275,18 @@ public class DefaultTaskExecutor implements TaskExecutor {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
if (taskExecutorThread != null) {
taskExecutorThread.pauseRequested.set(future);
log.debug("Asking {} to hand back task", taskExecutorThread.getName());
if (!taskExecutorThread.taskReleaseRequested.compareAndSet(null, future)) {
throw new IllegalStateException("There was already a task release request registered");
}
if (shutdownGate.getCount() == 0) {
log.debug("Completing future, because task executor was just shut down");
future.complete(null);
} else {
taskManager.signalTaskExecutors();
}
} else {
log.debug("Tried to unassign but no thread is running");
future.complete(null);
}

66
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java

@ -16,12 +16,12 @@ @@ -16,12 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
import java.time.Duration;
import java.util.concurrent.locks.Condition;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
@ -65,7 +65,7 @@ public class DefaultTaskManager implements TaskManager { @@ -65,7 +65,7 @@ public class DefaultTaskManager implements TaskManager {
private final List<TaskExecutor> taskExecutors;
static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
public static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
@Override
public TaskExecutor create(final TaskManager taskManager, final String name, final Time time, final TaskExecutionMetadata taskExecutionMetadata) {
return new DefaultTaskExecutor(taskManager, name, time, taskExecutionMetadata);
@ -76,9 +76,9 @@ public class DefaultTaskManager implements TaskManager { @@ -76,9 +76,9 @@ public class DefaultTaskManager implements TaskManager {
public DefaultTaskManager(final Time time,
final String clientId,
final TasksRegistry tasks,
final StreamsConfig config,
final TaskExecutorCreator executorCreator,
final TaskExecutionMetadata taskExecutionMetadata
final TaskExecutionMetadata taskExecutionMetadata,
final int numExecutors
) {
final String logPrefix = String.format("%s ", clientId);
final LogContext logContext = new LogContext(logPrefix);
@ -87,7 +87,6 @@ public class DefaultTaskManager implements TaskManager { @@ -87,7 +87,6 @@ public class DefaultTaskManager implements TaskManager {
this.tasks = tasks;
this.taskExecutionMetadata = taskExecutionMetadata;
final int numExecutors = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
this.taskExecutors = new ArrayList<>(numExecutors);
for (int i = 1; i <= numExecutors; i++) {
final String name = clientId + "-TaskExecutor-" + i;
@ -106,7 +105,8 @@ public class DefaultTaskManager implements TaskManager { @@ -106,7 +105,8 @@ public class DefaultTaskManager implements TaskManager {
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds())
canProgress((StreamTask) task, time.milliseconds()) &&
!hasUncaughtException(task.id())
) {
assignedTasks.put(task.id(), executor);
@ -117,6 +117,8 @@ public class DefaultTaskManager implements TaskManager { @@ -117,6 +117,8 @@ public class DefaultTaskManager implements TaskManager {
}
}
log.debug("Found no assignable task for executor {}", executor.name());
return null;
});
}
@ -127,7 +129,8 @@ public class DefaultTaskManager implements TaskManager { @@ -127,7 +129,8 @@ public class DefaultTaskManager implements TaskManager {
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds())
canProgress((StreamTask) task, time.milliseconds()) &&
!hasUncaughtException(task.id())
) {
log.debug("Await unblocked: returning early from await since a processable task {} was found", task.id());
return false;
@ -151,7 +154,7 @@ public class DefaultTaskManager implements TaskManager { @@ -151,7 +154,7 @@ public class DefaultTaskManager implements TaskManager {
}
}
public void signalProcessableTasks() {
public void signalTaskExecutors() {
log.debug("Waking up task executors");
executeWithTasksLocked(tasksCondition::signalAll);
}
@ -177,10 +180,16 @@ public class DefaultTaskManager implements TaskManager { @@ -177,10 +180,16 @@ public class DefaultTaskManager implements TaskManager {
@Override
public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
if (taskIds.isEmpty()) {
result.complete(null);
return result;
}
return returnWithTasksLocked(() -> {
lockedTasks.addAll(taskIds);
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
final Set<TaskId> remainingTaskIds = new ConcurrentSkipListSet<>(taskIds);
for (final TaskId taskId : taskIds) {
@ -195,12 +204,18 @@ public class DefaultTaskManager implements TaskManager { @@ -195,12 +204,18 @@ public class DefaultTaskManager implements TaskManager {
}
if (assignedTasks.containsKey(taskId)) {
final KafkaFuture<StreamTask> future = assignedTasks.get(taskId).unassign();
final TaskExecutor executor = assignedTasks.get(taskId);
log.debug("Requesting release of task {} from {}", taskId, executor.name());
final KafkaFuture<StreamTask> future = executor.unassign();
future.whenComplete((streamTask, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
remainingTaskIds.remove(streamTask.id());
assert !assignedTasks.containsKey(taskId);
// It can happen that the executor handed back the task before we asked it to
// in which case `streamTask` will be null here.
assert streamTask == null || streamTask.id() == taskId;
remainingTaskIds.remove(taskId);
if (remainingTaskIds.isEmpty()) {
result.complete(null);
}
@ -227,6 +242,11 @@ public class DefaultTaskManager implements TaskManager { @@ -227,6 +242,11 @@ public class DefaultTaskManager implements TaskManager {
@Override
public void unlockTasks(final Set<TaskId> taskIds) {
if (taskIds.isEmpty()) {
return;
}
executeWithTasksLocked(() -> {
lockedTasks.removeAll(taskIds);
log.debug("Waking up task executors");
@ -306,11 +326,17 @@ public class DefaultTaskManager implements TaskManager { @@ -306,11 +326,17 @@ public class DefaultTaskManager implements TaskManager {
return result;
});
log.debug("Drained {} uncaught exceptions", returnValue.size());
if (!returnValue.isEmpty()) {
log.debug("Drained {} uncaught exceptions", returnValue.size());
}
return returnValue;
}
public boolean hasUncaughtException(final TaskId taskId) {
return returnWithTasksLocked(() -> uncaughtExceptions.containsKey(taskId));
}
private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {
@ -334,5 +360,21 @@ public class DefaultTaskManager implements TaskManager { @@ -334,5 +360,21 @@ public class DefaultTaskManager implements TaskManager {
taskExecutionMetadata.canProcessTask(task, nowMs) && task.isProcessable(nowMs) ||
taskExecutionMetadata.canPunctuateTask(task) && (task.canPunctuateStreamTime() || task.canPunctuateSystemTime());
}
public void startTaskExecutors() {
for (final TaskExecutor t: taskExecutors) {
t.start();
}
}
public void shutdown(final Duration duration) {
for (final TaskExecutor t: taskExecutors) {
t.requestShutdown();
}
signalTaskExecutors();
for (final TaskExecutor t: taskExecutors) {
t.awaitShutdown(duration);
}
}
}

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java

@ -31,18 +31,35 @@ public interface TaskExecutor { @@ -31,18 +31,35 @@ public interface TaskExecutor {
/**
* Starts the task executor.
* Idempotent operation - will have no effect if thread is already started.
*/
void start();
/**
* Returns true if the task executor thread is running.
*/
boolean isRunning();
/**
* Asks the task executor to shut down.
* Idempotent operation - will have no effect if thread was already asked to shut down
*
* @throws
* org.apache.kafka.streams.errors.StreamsException if the state updater thread cannot shutdown within the timeout
*/
void requestShutdown();
/**
* Shuts down the task processor updater.
* Idempotent operation - will have no effect if thread is already shut down.
* Must call `requestShutdown` first.
*
* @param timeout duration how long to wait until the state updater is shut down
*
* @throws
* org.apache.kafka.streams.errors.StreamsException if the state updater thread cannot shutdown within the timeout
* org.apache.kafka.streams.errors.StreamsException if the state updater thread does not shutdown within the timeout
*/
void shutdown(final Duration timeout);
void awaitShutdown(final Duration timeout);
/**
* Get the current assigned processing task. The task returned is read-only and cannot be modified.

27
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.streams.errors.StreamsException;
@ -59,7 +60,7 @@ public interface TaskManager { @@ -59,7 +60,7 @@ public interface TaskManager {
KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds);
/**
* Lock all of the managed active tasks from the task manager. Similar to {@link #lockTasks(Set)}.
* Lock all the managed active tasks from the task manager. Similar to {@link #lockTasks(Set)}.
*
* This method does not block, instead a future is returned.
*/
@ -71,7 +72,7 @@ public interface TaskManager { @@ -71,7 +72,7 @@ public interface TaskManager {
void unlockTasks(final Set<TaskId> taskIds);
/**
* Unlock all of the managed active tasks from the task manager. Similar to {@link #unlockTasks(Set)}.
* Unlock all the managed active tasks from the task manager. Similar to {@link #unlockTasks(Set)}.
*/
void unlockAllTasks();
@ -119,14 +120,34 @@ public interface TaskManager { @@ -119,14 +120,34 @@ public interface TaskManager {
*/
Map<TaskId, StreamsException> drainUncaughtExceptions();
/**
* Can be used to check if a specific task has an uncaught exception.
*
* @param taskId the task ID to check for
*/
boolean hasUncaughtException(final TaskId taskId);
/**
* Signals that at least one task has become processable, e.g. because it was resumed or new records may be available.
*/
void signalProcessableTasks();
void signalTaskExecutors();
/**
* Blocks until unassigned processable tasks may be available.
*/
void awaitProcessableTasks() throws InterruptedException;
/**
* Starts all threads associated with this task manager.
*/
void startTaskExecutors();
/**
* Shuts down all threads associated with this task manager.
* All tasks will be unlocked and unassigned by the end of this.
*
* @param duration Time to wait for each thread to shut down.
*/
void shutdown(final Duration duration);
}

13
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -2323,6 +2323,19 @@ public class StreamTaskTest { @@ -2323,6 +2323,19 @@ public class StreamTaskTest {
assertThat(task.state(), is(Task.State.CLOSED));
}
@Test
public void shouldFlushStateManagerAndRecordCollector() {
stateManager.flush();
EasyMock.expectLastCall().once();
recordCollector.flush();
EasyMock.expectLastCall().once();
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), false);
task.flush();
}
@Test
public void shouldClearCommitStatusesInCloseDirty() {
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);

50
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java

@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.TimeoutException; @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.junit.jupiter.api.AfterEach;
@ -31,6 +32,7 @@ import java.time.Duration; @@ -31,6 +32,7 @@ import java.time.Duration;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -43,6 +45,8 @@ import static org.mockito.Mockito.timeout; @@ -43,6 +45,8 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.apache.kafka.test.TestUtils.waitForCondition;
public class DefaultTaskExecutorTest {
private final static long VERIFICATION_TIMEOUT = 15000;
@ -60,29 +64,51 @@ public class DefaultTaskExecutorTest { @@ -60,29 +64,51 @@ public class DefaultTaskExecutorTest {
when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null);
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.id()).thenReturn(new TaskId(0, 0, "A"));
when(task.process(anyLong())).thenReturn(true);
when(task.prepareCommit()).thenReturn(Collections.emptyMap());
}
@AfterEach
public void tearDown() {
taskExecutor.shutdown(Duration.ofMinutes(1));
taskExecutor.requestShutdown();
taskExecutor.awaitShutdown(Duration.ofMinutes(1));
}
@Test
public void shouldShutdownTaskExecutor() {
assertNull(taskExecutor.currentTask(), "Have task assigned before startup");
assertFalse(taskExecutor.isRunning());
taskExecutor.start();
assertTrue(taskExecutor.isRunning());
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
taskExecutor.shutdown(Duration.ofMinutes(1));
taskExecutor.requestShutdown();
taskExecutor.awaitShutdown(Duration.ofMinutes(1));
verify(task).prepareCommit();
verify(task).flush();
verify(taskManager).unassignTask(task, taskExecutor);
assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown");
assertFalse(taskExecutor.isRunning());
}
@Test
public void shouldClearTaskReleaseFutureOnShutdown() throws InterruptedException {
assertNull(taskExecutor.currentTask(), "Have task assigned before startup");
taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
final KafkaFuture<StreamTask> future = taskExecutor.unassign();
taskExecutor.requestShutdown();
taskExecutor.awaitShutdown(Duration.ofMinutes(1));
waitForCondition(future::isDone, "Await for unassign future to complete");
assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown");
}
@Test
@ -104,7 +130,7 @@ public class DefaultTaskExecutorTest { @@ -104,7 +130,7 @@ public class DefaultTaskExecutorTest {
taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor);
verify(task).prepareCommit();
verify(task).flush();
assertNull(taskExecutor.currentTask());
}
@ -209,7 +235,7 @@ public class DefaultTaskExecutorTest { @@ -209,7 +235,7 @@ public class DefaultTaskExecutorTest {
final KafkaFuture<StreamTask> future = taskExecutor.unassign();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor);
verify(task).prepareCommit();
verify(task).flush();
assertNull(taskExecutor.currentTask());
assertTrue(future.isDone(), "Unassign is not completed");
@ -223,10 +249,22 @@ public class DefaultTaskExecutorTest { @@ -223,10 +249,22 @@ public class DefaultTaskExecutorTest {
taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id());
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor);
assertNull(taskExecutor.currentTask());
assertTrue(taskExecutor.isRunning(), "should not shut down upon exception");
}
@Test
public void shouldNotFlushOnException() {
final StreamsException exception = mock(StreamsException.class);
when(task.process(anyLong())).thenThrow(exception);
when(taskManager.hasUncaughtException(task.id())).thenReturn(true);
taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, taskExecutor);
verify(task, never()).flush();
}
}

101
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -23,9 +24,9 @@ import org.apache.kafka.common.KafkaFuture; @@ -23,9 +24,9 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
@ -34,11 +35,6 @@ import org.junit.jupiter.api.BeforeEach; @@ -34,11 +35,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -65,17 +61,8 @@ public class DefaultTaskManagerTest { @@ -65,17 +61,8 @@ public class DefaultTaskManagerTest {
private final StreamsException exception = mock(StreamsException.class);
private final TaskExecutionMetadata taskExecutionMetadata = mock(TaskExecutionMetadata.class);
private final StreamsConfig config = new StreamsConfig(configProps());
private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config,
(taskManager, name, time, taskExecutionMetadata) -> taskExecutor, taskExecutionMetadata);
private Properties configProps() {
return mkObjectProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
));
}
private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks,
(taskManager, name, time, taskExecutionMetadata) -> taskExecutor, taskExecutionMetadata, 1);
@BeforeEach
public void setUp() {
@ -84,6 +71,22 @@ public class DefaultTaskManagerTest { @@ -84,6 +71,22 @@ public class DefaultTaskManagerTest {
when(tasks.task(taskId)).thenReturn(task);
}
@Test
public void shouldShutdownTaskExecutors() {
final Duration duration = mock(Duration.class);
taskManager.shutdown(duration);
verify(taskExecutor).requestShutdown();
verify(taskExecutor).awaitShutdown(duration);
}
@Test
public void shouldStartTaskExecutors() {
taskManager.startTaskExecutors();
verify(taskExecutor).start();
}
@Test
public void shouldAddTask() {
taskManager.add(Collections.singleton(task));
@ -119,7 +122,7 @@ public class DefaultTaskManagerTest { @@ -119,7 +122,7 @@ public class DefaultTaskManagerTest {
public void shutdown() {
shutdownRequested.set(true);
taskManager.signalProcessableTasks();
taskManager.signalTaskExecutors();
}
}
@ -155,7 +158,7 @@ public class DefaultTaskManagerTest { @@ -155,7 +158,7 @@ public class DefaultTaskManagerTest {
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
taskManager.signalProcessableTasks();
taskManager.signalTaskExecutors();
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
@ -233,6 +236,18 @@ public class DefaultTaskManagerTest { @@ -233,6 +236,18 @@ public class DefaultTaskManagerTest {
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
ensureTaskMakesProgress();
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(new StreamsException("Exception"), taskId);
taskManager.unassignTask(task, taskExecutor);
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
taskManager.add(Collections.singleton(task));
@ -314,6 +329,7 @@ public class DefaultTaskManagerTest { @@ -314,6 +329,7 @@ public class DefaultTaskManagerTest {
public void shouldNotAssignLockedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
@ -322,10 +338,48 @@ public class DefaultTaskManagerTest { @@ -322,10 +338,48 @@ public class DefaultTaskManagerTest {
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldLockAnEmptySetOfTasks() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
assertEquals(task, taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldLockATaskThatWasVoluntarilyReleased() {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
when(taskExecutor.unassign()).thenReturn(future);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
final KafkaFuture<Void> lockingFuture = taskManager.lockTasks(Collections.singleton(task.id()));
assertFalse(lockingFuture.isDone());
taskManager.unassignTask(task, taskExecutor);
future.complete(null);
assertTrue(lockingFuture.isDone());
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldNotAssignAnyLockedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
@ -379,12 +433,21 @@ public class DefaultTaskManagerTest { @@ -379,12 +433,21 @@ public class DefaultTaskManagerTest {
assertEquals(task, taskManager.assignNextTask(taskExecutor));
when(taskExecutor.currentTask()).thenReturn(new ReadOnlyTask(task));
final KafkaFuture<Void> lockFuture = taskManager.lockAllTasks();
assertFalse(lockFuture.isDone());
verify(taskExecutor).unassign();
taskManager.unassignTask(task, taskExecutor);
future.complete(task);
assertTrue(lockFuture.isDone());
}
private void ensureTaskMakesProgress() {
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
when(task.canPunctuateStreamTime()).thenReturn(true);
}
}

Loading…
Cancel
Save