From d144b7ee387308a59e52cbdabc7b66dd3b2926cc Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Tue, 24 Oct 2023 10:17:55 +0200 Subject: [PATCH] KAFKA-15326: [10/N] Integrate processing thread (#14193) - Introduce a new internal config flag to enable processing threads - If enabled, create a scheduling task manager inside the normal task manager (renamings will be added on top of this), and use it from the stream thread - All operations inside the task manager that change task state, lock the corresponding tasks if processing threads are enabled. - Adds a new abstract class AbstractPartitionGroup. We can modify the underlying implementation depending on the synchronization requirements. PartitionGroup is the unsynchronized subclass that is going to be used by the original code path. The processing thread code path uses a trivially synchronized SynchronizedPartitionGroup that uses object monitors. Further down the road, there is the opportunity to implement a weakly synchronized alternative. The details are complex, but since the implementation is essentially a queue + some other things, it should be feasible to implement this lock-free. - Refactorings in StreamThreadTest: Make all tests use the thread member variable and add tearDown in order avoid thread leaks and simplify debugging. Make the test parameterized on two internal flags: state updater enabled and processing threads enabled. Use JUnit's assume to disable all tests that do not apply. Enable some integration tests with processing threads enabled. Reviewer: Bruno Cadonna --- .../kafka/clients/producer/MockProducer.java | 27 +- .../clients/producer/MockProducerTest.java | 11 +- .../apache/kafka/streams/StreamsConfig.java | 7 + .../internals/AbstractPartitionGroup.java | 85 ++ .../internals/ActiveTaskCreator.java | 12 +- .../processor/internals/PartitionGroup.java | 49 +- .../processor/internals/StreamTask.java | 42 +- .../processor/internals/StreamThread.java | 131 ++- .../internals/SynchronizedPartitionGroup.java | 101 +++ .../processor/internals/TaskManager.java | 106 ++- .../streams/processor/internals/Tasks.java | 5 + .../processor/internals/TasksRegistry.java | 2 + .../internals/tasks/DefaultTaskExecutor.java | 11 +- .../internals/tasks/DefaultTaskManager.java | 6 +- .../internals/tasks/TaskManager.java | 2 +- .../AdjustStreamThreadCountTest.java | 2 +- .../integration/EosIntegrationTest.java | 31 +- .../SmokeTestDriverIntegrationTest.java | 13 +- .../internals/ActiveTaskCreatorTest.java | 1 + .../internals/PartitionGroupTest.java | 26 +- .../processor/internals/StreamTaskTest.java | 32 +- .../processor/internals/StreamThreadTest.java | 838 +++++++++++------- .../SynchronizedPartitionGroupTest.java | 201 +++++ .../processor/internals/TaskManagerTest.java | 124 ++- .../StreamThreadStateStoreProviderTest.java | 5 +- .../kafka/streams/TopologyTestDriver.java | 4 +- 26 files changed, 1425 insertions(+), 449 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 8a832f8d288..4c7a0123266 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -160,7 +160,8 @@ public class MockProducer implements Producer { @Override public void initTransactions() { - verifyProducerState(); + verifyNotClosed(); + verifyNotFenced(); if (this.transactionInitialized) { throw new IllegalStateException("MockProducer has already been initialized for transactions."); } @@ -176,7 +177,8 @@ public class MockProducer implements Producer { @Override public void beginTransaction() throws ProducerFencedException { - verifyProducerState(); + verifyNotClosed(); + verifyNotFenced(); verifyTransactionsInitialized(); if (this.beginTransactionException != null) { @@ -205,7 +207,8 @@ public class MockProducer implements Producer { public void sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException { Objects.requireNonNull(groupMetadata); - verifyProducerState(); + verifyNotClosed(); + verifyNotFenced(); verifyTransactionsInitialized(); verifyTransactionInFlight(); @@ -224,7 +227,8 @@ public class MockProducer implements Producer { @Override public void commitTransaction() throws ProducerFencedException { - verifyProducerState(); + verifyNotClosed(); + verifyNotFenced(); verifyTransactionsInitialized(); verifyTransactionInFlight(); @@ -249,7 +253,8 @@ public class MockProducer implements Producer { @Override public void abortTransaction() throws ProducerFencedException { - verifyProducerState(); + verifyNotClosed(); + verifyNotFenced(); verifyTransactionsInitialized(); verifyTransactionInFlight(); @@ -265,10 +270,13 @@ public class MockProducer implements Producer { this.transactionInFlight = false; } - private synchronized void verifyProducerState() { + private synchronized void verifyNotClosed() { if (this.closed) { throw new IllegalStateException("MockProducer is already closed."); } + } + + private synchronized void verifyNotFenced() { if (this.producerFenced) { throw new ProducerFencedException("MockProducer is fenced."); } @@ -288,7 +296,7 @@ public class MockProducer implements Producer { /** * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied. - * + * * @see #history() */ @Override @@ -362,7 +370,7 @@ public class MockProducer implements Producer { } public synchronized void flush() { - verifyProducerState(); + verifyNotClosed(); if (this.flushException != null) { throw this.flushException; @@ -415,7 +423,8 @@ public class MockProducer implements Producer { } public synchronized void fenceProducer() { - verifyProducerState(); + verifyNotClosed(); + verifyNotFenced(); verifyTransactionsInitialized(); this.producerFenced = true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 787f85cce53..9141fd51cb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.Future; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -702,7 +703,15 @@ public class MockProducerTest { producer.close(); assertThrows(IllegalStateException.class, producer::flush); } - + + @Test + public void shouldNotThrowOnFlushProducerIfProducerIsFenced() { + buildMockProducer(true); + producer.initTransactions(); + producer.fenceProducer(); + assertDoesNotThrow(producer::flush); + } + @Test @SuppressWarnings("unchecked") public void shouldThrowClassCastException() { diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 320370503e9..bf52378a167 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1200,6 +1200,13 @@ public class StreamsConfig extends AbstractConfig { public static boolean getStateUpdaterEnabled(final Map configs) { return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } + + // Private API to enable processing threads (i.e. polling is decoupled from processing) + public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__"; + + public static boolean getProcessingThreadsEnabled(final Map configs) { + return InternalConfig.getBoolean(configs, InternalConfig.PROCESSING_THREADS_ENABLED, false); + } public static boolean getBoolean(final Map configs, final String key, final boolean defaultValue) { final Object value = configs.getOrDefault(key, defaultValue); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java new file mode 100644 index 00000000000..12af39ca4db --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Set; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +abstract class AbstractPartitionGroup { + + abstract boolean readyToProcess(long wallClockTime); + + // creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions + abstract void updatePartitions(Set inputPartitions, Function recordQueueCreator); + + abstract void setPartitionTime(TopicPartition partition, long partitionTime); + + /** + * Get the next record and queue + * + * @return StampedRecord + */ + abstract StampedRecord nextRecord(RecordInfo info, long wallClockTime); + + /** + * Adds raw records to this partition group + * + * @param partition the partition + * @param rawRecords the raw records + * @return the queue size for the partition + */ + abstract int addRawRecords(TopicPartition partition, Iterable> rawRecords); + + abstract long partitionTimestamp(final TopicPartition partition); + + /** + * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions + */ + abstract long streamTime(); + + abstract Long headRecordOffset(final TopicPartition partition); + + abstract int numBuffered(); + + abstract int numBuffered(TopicPartition tp); + + abstract void clear(); + + abstract void updateLags(); + + abstract void close(); + + abstract Set partitions(); + + static class RecordInfo { + RecordQueue queue; + + ProcessorNode node() { + return queue.source(); + } + + TopicPartition partition() { + return queue.partition(); + } + + RecordQueue queue() { + return queue; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 7f423c84069..fbdaeffa37f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -66,6 +66,7 @@ class ActiveTaskCreator { private final Map taskProducers; private final ProcessingMode processingMode; private final boolean stateUpdaterEnabled; + private final boolean processingThreadsEnabled; ActiveTaskCreator(final TopologyMetadata topologyMetadata, final StreamsConfig applicationConfig, @@ -78,7 +79,9 @@ class ActiveTaskCreator { final String threadId, final UUID processId, final Logger log, - final boolean stateUpdaterEnabled) { + final boolean stateUpdaterEnabled, + final boolean processingThreadsEnabled + ) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; @@ -90,6 +93,7 @@ class ActiveTaskCreator { this.threadId = threadId; this.log = log; this.stateUpdaterEnabled = stateUpdaterEnabled; + this.processingThreadsEnabled = processingThreadsEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); processingMode = processingMode(applicationConfig); @@ -242,7 +246,8 @@ class ActiveTaskCreator { standbyTask.stateMgr, recordCollector, standbyTask.processorContext, - standbyTask.logContext + standbyTask.logContext, + processingThreadsEnabled ); log.trace("Created active task {} from recycled standby task with assigned partitions {}", task.id, inputPartitions); @@ -272,7 +277,8 @@ class ActiveTaskCreator { stateManager, recordCollector, context, - logContext + logContext, + processingThreadsEnabled ); log.trace("Created active task {} with assigned partitions {}", taskId, inputPartitions); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index b0a3f6f4f98..4852ba97932 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -56,7 +56,7 @@ import java.util.function.Function; * As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing * (i.e., it increases or stays the same over time). */ -public class PartitionGroup { +class PartitionGroup extends AbstractPartitionGroup { private final Logger logger; private final Map partitionQueues; @@ -72,22 +72,6 @@ public class PartitionGroup { private final Map idlePartitionDeadlines = new HashMap<>(); private final Map fetchedLags = new HashMap<>(); - static class RecordInfo { - RecordQueue queue; - - ProcessorNode node() { - return queue.source(); - } - - TopicPartition partition() { - return queue.partition(); - } - - RecordQueue queue() { - return queue; - } - } - PartitionGroup(final LogContext logContext, final Map partitionQueues, final Function lagProvider, @@ -106,7 +90,8 @@ public class PartitionGroup { streamTime = RecordQueue.UNKNOWN; } - public boolean readyToProcess(final long wallClockTime) { + @Override + boolean readyToProcess(final long wallClockTime) { if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) { final Set bufferedPartitions = new HashSet<>(); @@ -209,7 +194,7 @@ public class PartitionGroup { } } - // visible for testing + @Override long partitionTimestamp(final TopicPartition partition) { final RecordQueue queue = partitionQueues.get(partition); if (queue == null) { @@ -219,6 +204,7 @@ public class PartitionGroup { } // creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions + @Override void updatePartitions(final Set inputPartitions, final Function recordQueueCreator) { final Set removedPartitions = new HashSet<>(); final Set newInputPartitions = new HashSet<>(inputPartitions); @@ -241,6 +227,7 @@ public class PartitionGroup { allBuffered = allBuffered && newInputPartitions.isEmpty(); } + @Override void setPartitionTime(final TopicPartition partition, final long partitionTime) { final RecordQueue queue = partitionQueues.get(partition); if (queue == null) { @@ -252,11 +239,7 @@ public class PartitionGroup { queue.setPartitionTime(partitionTime); } - /** - * Get the next record and queue - * - * @return StampedRecord - */ + @Override StampedRecord nextRecord(final RecordInfo info, final long wallClockTime) { StampedRecord record = null; @@ -290,13 +273,7 @@ public class PartitionGroup { return record; } - /** - * Adds raw records to this partition group - * - * @param partition the partition - * @param rawRecords the raw records - * @return the queue size for the partition - */ + @Override int addRawRecords(final TopicPartition partition, final Iterable> rawRecords) { final RecordQueue recordQueue = partitionQueues.get(partition); @@ -328,13 +305,12 @@ public class PartitionGroup { return Collections.unmodifiableSet(partitionQueues.keySet()); } - /** - * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions - */ + @Override long streamTime() { return streamTime; } + @Override Long headRecordOffset(final TopicPartition partition) { final RecordQueue recordQueue = partitionQueues.get(partition); @@ -348,6 +324,7 @@ public class PartitionGroup { /** * @throws IllegalStateException if the record's partition does not belong to this partition group */ + @Override int numBuffered(final TopicPartition partition) { final RecordQueue recordQueue = partitionQueues.get(partition); @@ -358,6 +335,7 @@ public class PartitionGroup { return recordQueue.size(); } + @Override int numBuffered() { return totalBuffered; } @@ -367,6 +345,7 @@ public class PartitionGroup { return allBuffered; } + @Override void clear() { for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); @@ -377,12 +356,14 @@ public class PartitionGroup { fetchedLags.clear(); } + @Override void close() { for (final RecordQueue queue : partitionQueues.values()) { queue.close(); } } + @Override void updateLags() { if (maxTaskIdleMs != StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { for (final TopicPartition tp : partitionQueues.keySet()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 11d658b3d8f..163e2c0997e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.AbstractPartitionGroup.RecordInfo; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; @@ -64,7 +65,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; /** - * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing. + * A StreamTask is associated with a {@link AbstractPartitionGroup}, and is assigned to a StreamThread for processing. */ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, Task { @@ -77,9 +78,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final boolean eosEnabled; private final int maxBufferedSize; - private final PartitionGroup partitionGroup; + private final AbstractPartitionGroup partitionGroup; private final RecordCollector recordCollector; - private final PartitionGroup.RecordInfo recordInfo; + private final AbstractPartitionGroup.RecordInfo recordInfo; private final Map consumedOffsets; private final Map committedOffsets; private final Map highWatermark; @@ -111,7 +112,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private boolean hasPendingTxCommit = false; private Optional timeCurrentIdlingStarted; - @SuppressWarnings({"rawtypes", "this-escape"}) + @SuppressWarnings({"rawtypes", "this-escape", "checkstyle:ParameterNumber"}) public StreamTask(final TaskId id, final Set inputPartitions, final ProcessorTopology topology, @@ -124,7 +125,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final ProcessorStateManager stateMgr, final RecordCollector recordCollector, final InternalProcessorContext processorContext, - final LogContext logContext) { + final LogContext logContext, + final boolean processingThreadsEnabled + ) { super( id, topology, @@ -181,19 +184,30 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, recordQueueCreator = new RecordQueueCreator(this.logContext, config.timestampExtractor, config.deserializationExceptionHandler); - recordInfo = new PartitionGroup.RecordInfo(); + recordInfo = new RecordInfo(); final Sensor enforcedProcessingSensor; enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); final long maxTaskIdleMs = config.maxTaskIdleMs; - partitionGroup = new PartitionGroup( - logContext, - createPartitionQueues(), - mainConsumer::currentLag, - TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), - enforcedProcessingSensor, - maxTaskIdleMs - ); + if (processingThreadsEnabled) { + partitionGroup = new SynchronizedPartitionGroup(new PartitionGroup( + logContext, + createPartitionQueues(), + mainConsumer::currentLag, + TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), + enforcedProcessingSensor, + maxTaskIdleMs + )); + } else { + partitionGroup = new PartitionGroup( + logContext, + createPartitionQueues(), + mainConsumer::currentLag, + TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), + enforcedProcessingSensor, + maxTaskIdleMs + ); + } stateMgr.registerGlobalStateStores(topology.globalStateStores()); committedOffsets = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index bfce0b32608..509cb18eea6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; +import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.DefaultTaskExecutorCreator; import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.Queue; @@ -331,6 +333,7 @@ public class StreamThread extends Thread { private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false); private final boolean eosEnabled; private final boolean stateUpdaterEnabled; + private final boolean processingThreadsEnabled; public static StreamThread create(final TopologyMetadata topologyMetadata, final StreamsConfig config, @@ -375,6 +378,7 @@ public class StreamThread extends Thread { final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); + final boolean proceessingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( topologyMetadata, config, @@ -387,7 +391,9 @@ public class StreamThread extends Thread { threadId, processId, log, - stateUpdaterEnabled); + stateUpdaterEnabled, + proceessingThreadsEnabled + ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( topologyMetadata, config, @@ -398,6 +404,15 @@ public class StreamThread extends Thread { log, stateUpdaterEnabled); + final Tasks tasks = new Tasks(new LogContext(logPrefix)); + final boolean processingThreadsEnabled = + InternalConfig.getProcessingThreadsEnabled(config.originals()); + + final DefaultTaskManager schedulingTaskManager = + maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); + final StateUpdater stateUpdater = + maybeCreateAndStartStateUpdater(stateUpdaterEnabled, streamsMetrics, config, changelogReader, topologyMetadata, time, clientId, threadIdx); + final TaskManager taskManager = new TaskManager( time, changelogReader, @@ -405,11 +420,12 @@ public class StreamThread extends Thread { logPrefix, activeTaskCreator, standbyTaskCreator, - new Tasks(new LogContext(logPrefix)), + tasks, topologyMetadata, adminClient, stateDirectory, - maybeCreateAndStartStateUpdater(stateUpdaterEnabled, streamsMetrics, config, changelogReader, topologyMetadata, time, clientId, threadIdx) + stateUpdater, + schedulingTaskManager ); referenceContainer.taskManager = taskManager; @@ -452,6 +468,31 @@ public class StreamThread extends Thread { return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId)); } + private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, + final boolean stateUpdaterEnabled, + final TopologyMetadata topologyMetadata, + final Time time, + final String threadId, + final Tasks tasks) { + if (processingThreadsEnabled) { + if (!stateUpdaterEnabled) { + throw new IllegalStateException("Processing threads require the state updater to be enabled"); + } + + final DefaultTaskManager defaultTaskManager = new DefaultTaskManager( + time, + threadId, + tasks, + new DefaultTaskExecutorCreator(), + topologyMetadata.taskExecutionMetadata(), + 1 + ); + defaultTaskManager.startTaskExecutors(); + return defaultTaskManager; + } + return null; + } + private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled, final StreamsMetricsImpl streamsMetrics, final StreamsConfig streamsConfig, @@ -488,7 +529,8 @@ public class StreamThread extends Thread { final Queue nonFatalExceptionsToHandle, final Runnable shutdownErrorHook, final BiConsumer streamsUncaughtExceptionHandler, - final java.util.function.Consumer cacheResizer) { + final java.util.function.Consumer cacheResizer + ) { super(threadId); this.stateLock = new Object(); this.adminClient = adminClient; @@ -558,6 +600,7 @@ public class StreamThread extends Thread { this.numIterations = 1; this.eosEnabled = eosEnabled(config); this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); + this.processingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); } private static final class InternalConsumerConfig extends ConsumerConfig { @@ -620,7 +663,11 @@ public class StreamThread extends Thread { if (size != -1L) { cacheResizer.accept(size); } - runOnce(); + if (processingThreadsEnabled) { + runOnceWithProcessingThreads(); + } else { + runOnceWithoutProcessingThreads(); + } // Check for a scheduled rebalance but don't trigger it until the current rebalance is done if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) { @@ -765,7 +812,7 @@ public class StreamThread extends Thread { * or if the task producer got fenced (EOS) */ // Visible for testing - void runOnce() { + void runOnceWithoutProcessingThreads() { final long startMs = time.milliseconds(); now = startMs; @@ -900,6 +947,78 @@ public class StreamThread extends Thread { } } + /** + * One iteration of a thread includes the following steps: + * + * 1. poll records from main consumer and add to buffer; + * 2. check the task manager for any exceptions to be handled + * 3. commit all tasks if necessary; + * + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException If the store's change log does not contain the partition + * @throws TaskMigratedException If another thread wrote to the changelog topic that is currently restored + * or if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ + // Visible for testing + void runOnceWithProcessingThreads() { + final long startMs = time.milliseconds(); + now = startMs; + + final long pollLatency; + taskManager.resumePollingForPartitionsWithAvailableSpace(); + try { + pollLatency = pollPhase(); + } finally { + taskManager.updateLags(); + } + + // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). + // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). + // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation + // could affect the task manager state beyond this point within #runOnce(). + if (!isRunning()) { + log.debug("Thread state is already {}, skipping the run once call after poll request", state); + return; + } + + long totalCommitLatency = 0L; + if (isRunning()) { + + checkStateUpdater(); + + taskManager.maybeThrowTaskExceptionsFromProcessingThreads(); + taskManager.signalTaskExecutors(); + + final long beforeCommitMs = now; + final int committed = maybeCommit(); + final long commitLatency = Math.max(now - beforeCommitMs, 0); + totalCommitLatency += commitLatency; + if (committed > 0) { + totalCommittedSinceLastSummary += committed; + commitSensor.record(commitLatency / (double) committed, now); + + if (log.isDebugEnabled()) { + log.debug("Committed all active tasks {} and standby tasks {} in {}ms", + taskManager.activeTaskIds(), taskManager.standbyTaskIds(), commitLatency); + } + } + } + + now = time.milliseconds(); + final long runOnceLatency = now - startMs; + pollRatioSensor.record((double) pollLatency / runOnceLatency, now); + commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); + + final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS; + if (logProcessingSummary) { + log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary); + + totalCommittedSinceLastSummary = 0L; + lastLogSummaryMs = now; + } + } + private void initializeAndRestorePhase() { final java.util.function.Consumer> offsetResetter = partitions -> resetOffsets(partitions, null); final State stateSnapshot = state; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java new file mode 100644 index 00000000000..3e544c432a2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Set; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +class SynchronizedPartitionGroup extends AbstractPartitionGroup { + + private final AbstractPartitionGroup wrapped; + + public SynchronizedPartitionGroup(final AbstractPartitionGroup wrapped) { + this.wrapped = wrapped; + } + + @Override + synchronized boolean readyToProcess(final long wallClockTime) { + return wrapped.readyToProcess(wallClockTime); + } + + @Override + synchronized void updatePartitions(final Set inputPartitions, final Function recordQueueCreator) { + wrapped.updatePartitions(inputPartitions, recordQueueCreator); + } + + @Override + synchronized void setPartitionTime(final TopicPartition partition, final long partitionTime) { + wrapped.setPartitionTime(partition, partitionTime); + } + + @Override + synchronized StampedRecord nextRecord(final RecordInfo info, final long wallClockTime) { + return wrapped.nextRecord(info, wallClockTime); + } + + @Override + synchronized int addRawRecords(final TopicPartition partition, final Iterable> rawRecords) { + return wrapped.addRawRecords(partition, rawRecords); + } + + @Override + synchronized long partitionTimestamp(final TopicPartition partition) { + return wrapped.partitionTimestamp(partition); + } + + @Override + synchronized long streamTime() { + return wrapped.streamTime(); + } + + @Override + synchronized Long headRecordOffset(final TopicPartition partition) { + return wrapped.headRecordOffset(partition); + } + + @Override + synchronized int numBuffered() { + return wrapped.numBuffered(); + } + + @Override + synchronized int numBuffered(final TopicPartition tp) { + return wrapped.numBuffered(tp); + } + + @Override + synchronized void clear() { + wrapped.clear(); + } + + @Override + synchronized void updateLags() { + wrapped.updateLags(); + } + + @Override + synchronized void close() { + wrapped.close(); + } + + @Override + synchronized Set partitions() { + return wrapped.partitions(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index de91e801de7..6878ade68b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DeleteRecordsResult; import org.apache.kafka.clients.admin.RecordsToDelete; @@ -39,6 +40,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.slf4j.Logger; @@ -97,6 +99,7 @@ public class TaskManager { private final ActiveTaskCreator activeTaskCreator; private final StandbyTaskCreator standbyTaskCreator; private final StateUpdater stateUpdater; + private final DefaultTaskManager schedulingTaskManager; TaskManager(final Time time, final ChangelogReader changelogReader, @@ -108,7 +111,9 @@ public class TaskManager { final TopologyMetadata topologyMetadata, final Admin adminClient, final StateDirectory stateDirectory, - final StateUpdater stateUpdater) { + final StateUpdater stateUpdater, + final DefaultTaskManager schedulingTaskManager + ) { this.time = time; this.processId = processId; this.logPrefix = logPrefix; @@ -124,6 +129,7 @@ public class TaskManager { this.log = logContext.logger(getClass()); this.stateUpdater = stateUpdater; + this.schedulingTaskManager = schedulingTaskManager; this.tasks = tasks; this.taskExecutor = new TaskExecutor( this.tasks, @@ -202,6 +208,11 @@ public class TaskManager { * @throws TaskMigratedException */ boolean handleCorruption(final Set corruptedTasks) { + final Set activeTasks = new HashSet<>(tasks.activeTaskIds()); + + // We need to stop all processing, since we need to commit non-corrupted tasks as well. + maybeLockTasks(activeTasks); + final Set corruptedActiveTasks = new HashSet<>(); final Set corruptedStandbyTasks = new HashSet<>(); @@ -240,6 +251,9 @@ public class TaskManager { } closeDirtyAndRevive(corruptedActiveTasks, true); + + maybeUnlockTasks(activeTasks); + return !corruptedActiveTasks.isEmpty(); } @@ -329,6 +343,13 @@ public class TaskManager { final Map> tasksToRecycle = new HashMap<>(); final Set tasksToCloseClean = new TreeSet<>(Comparator.comparing(Task::id)); + final Set tasksToLock = + tasks.allTaskIds().stream() + .filter(x -> activeTasksToCreate.containsKey(x) || standbyTasksToCreate.containsKey(x)) + .collect(Collectors.toSet()); + + maybeLockTasks(tasksToLock); + // first put aside those unrecognized tasks because of unknown named-topologies tasks.clearPendingTasksToCreate(); tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate)); @@ -346,6 +367,8 @@ public class TaskManager { final Map taskCloseExceptions = closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean); + maybeUnlockTasks(tasksToLock); + maybeThrowTaskExceptions(taskCloseExceptions); createNewTasks(activeTasksToCreate, standbyTasksToCreate); @@ -964,6 +987,9 @@ public class TaskManager { final Map> consumedOffsetsPerTask = new HashMap<>(); final AtomicReference firstException = new AtomicReference<>(null); + final Set lockedTaskIds = activeRunningTaskIterable().stream().map(Task::id).collect(Collectors.toSet()); + maybeLockTasks(lockedTaskIds); + for (final Task task : activeRunningTaskIterable()) { if (remainingRevokedPartitions.containsAll(task.inputPartitions())) { // when the task input partitions are included in the revoked list, @@ -1057,6 +1083,8 @@ public class TaskManager { } } + maybeUnlockTasks(lockedTaskIds); + if (firstException.get() != null) { throw firstException.get(); } @@ -1112,6 +1140,8 @@ public class TaskManager { private void closeRunningTasksDirty() { final Set allTask = tasks.allTasks(); + final Set allTaskIds = tasks.allTaskIds(); + maybeLockTasks(allTaskIds); for (final Task task : allTask) { // Even though we've apparently dropped out of the group, we can continue safely to maintain our // standby tasks while we rejoin. @@ -1119,6 +1149,7 @@ public class TaskManager { closeTaskDirty(task, true); } } + maybeUnlockTasks(allTaskIds); } private void removeLostActiveTasksFromStateUpdater() { @@ -1136,6 +1167,9 @@ public class TaskManager { if (stateUpdater != null) { stateUpdater.signalResume(); } + if (schedulingTaskManager != null) { + schedulingTaskManager.signalTaskExecutors(); + } } /** @@ -1307,6 +1341,7 @@ public class TaskManager { void shutdown(final boolean clean) { shutdownStateUpdater(); + shutdownSchedulingTaskManager(); final AtomicReference firstException = new AtomicReference<>(null); @@ -1356,6 +1391,12 @@ public class TaskManager { } } + private void shutdownSchedulingTaskManager() { + if (schedulingTaskManager != null) { + schedulingTaskManager.shutdown(Duration.ofMillis(Long.MAX_VALUE)); + } + } + private void closeFailedTasksFromStateUpdater() { final Set tasksToCloseDirty = stateUpdater.drainExceptionsAndFailedTasks().stream() .flatMap(exAndTasks -> exAndTasks.getTasks().stream()).collect(Collectors.toSet()); @@ -1415,6 +1456,12 @@ public class TaskManager { void closeAndCleanUpTasks(final Collection activeTasks, final Collection standbyTasks, final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); + final Set ids = + activeTasks.stream() + .map(Task::id) + .collect(Collectors.toSet()); + maybeLockTasks(ids); + final Set tasksToCloseDirty = new HashSet<>(); tasksToCloseDirty.addAll(tryCloseCleanActiveTasks(activeTasks, clean, firstException)); tasksToCloseDirty.addAll(tryCloseCleanStandbyTasks(standbyTasks, clean, firstException)); @@ -1423,6 +1470,8 @@ public class TaskManager { closeTaskDirty(task, true); } + maybeUnlockTasks(ids); + final RuntimeException exception = firstException.get(); if (exception != null) { throw exception; @@ -1681,6 +1730,16 @@ public class TaskManager { } } + /** + * Wake-up any sleeping processing threads. + */ + public void signalTaskExecutors() { + if (schedulingTaskManager != null) { + // Wake up sleeping task executors after every poll, in case there is processing or punctuation to-do. + schedulingTaskManager.signalTaskExecutors(); + } + } + /** * Take records and add them to each respective task * @@ -1700,6 +1759,42 @@ public class TaskManager { } } + private void maybeLockTasks(final Set ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Locking tasks {}", ids.stream().map(TaskId::toString).collect(Collectors.joining(", "))); + } + boolean locked = false; + while (!locked) { + try { + schedulingTaskManager.lockTasks(ids).get(); + locked = true; + } catch (final InterruptedException e) { + log.warn("Interrupted while waiting for tasks {} to be locked", + ids.stream().map(TaskId::toString).collect(Collectors.joining(","))); + } catch (final ExecutionException e) { + log.info("Failed to lock tasks"); + throw new RuntimeException(e); + } + } + } + } + + private void maybeUnlockTasks(final Set ids) { + if (schedulingTaskManager != null && !ids.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Unlocking tasks {}", ids.stream().map(TaskId::toString).collect(Collectors.joining(", "))); + } + schedulingTaskManager.unlockTasks(ids); + } + } + + public void maybeThrowTaskExceptionsFromProcessingThreads() { + if (schedulingTaskManager != null) { + maybeThrowTaskExceptions(schedulingTaskManager.drainUncaughtExceptions()); + } + } + /** * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) @@ -1709,6 +1804,14 @@ public class TaskManager { */ int commit(final Collection tasksToCommit) { int committed = 0; + final Set ids = + tasksToCommit.stream() + .map(Task::id) + .collect(Collectors.toSet()); + maybeLockTasks(ids); + + // We have to throw the first uncaught exception after locking the tasks, to not attempt to commit failure records. + maybeThrowTaskExceptionsFromProcessingThreads(); final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); try { @@ -1719,6 +1822,7 @@ public class TaskManager { .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException)); } + maybeUnlockTasks(ids); return committed; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index a28fb22766d..00a9a27da3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -349,6 +349,11 @@ class Tasks implements TasksRegistry { return tasks; } + @Override + public synchronized Collection activeTaskIds() { + return Collections.unmodifiableCollection(activeTasksPerId.keySet()); + } + @Override public synchronized Collection activeTasks() { return Collections.unmodifiableCollection(activeTasksPerId.values()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java index 100654b2934..82fbf097776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java @@ -85,6 +85,8 @@ public interface TasksRegistry { Collection tasks(final Collection taskIds); + Collection activeTaskIds(); + Collection activeTasks(); Set allTasks(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index 09445d16076..e13ae5ffb3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -194,7 +194,16 @@ public class DefaultTaskExecutor implements TaskExecutor { // flush the task before giving it back to task manager, if we are not handing it back because of an error. if (!taskManager.hasUncaughtException(currentTask.id())) { - currentTask.flush(); + try { + currentTask.flush(); + } catch (final StreamsException e) { + log.error(String.format("Failed to flush stream task %s due to the following error:", currentTask.id()), e); + e.setTaskId(currentTask.id()); + taskManager.setUncaughtException(e, currentTask.id()); + } catch (final RuntimeException e) { + log.error(String.format("Failed to flush stream task %s due to the following error:", currentTask.id()), e); + taskManager.setUncaughtException(new StreamsException(e, currentTask.id()), currentTask.id()); + } } taskManager.unassignTask(currentTask, DefaultTaskExecutor.this); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index 56749b4f5f0..ab552bfd031 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -319,9 +319,9 @@ public class DefaultTaskManager implements TaskManager { exception.getMessage()); } - public Map drainUncaughtExceptions() { - final Map returnValue = returnWithTasksLocked(() -> { - final Map result = new HashMap<>(uncaughtExceptions); + public Map drainUncaughtExceptions() { + final Map returnValue = returnWithTasksLocked(() -> { + final Map result = new HashMap<>(uncaughtExceptions); uncaughtExceptions.clear(); return result; }); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java index 6a52f71dd62..2d049a19994 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java @@ -118,7 +118,7 @@ public interface TaskManager { * * @return A map from task ID to the exception that occurred. */ - Map drainUncaughtExceptions(); + Map drainUncaughtExceptions(); /** * Can be used to check if a specific task has an uncaught exception. diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index 47ccaa1a9c1..a74af0a25ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -442,7 +442,7 @@ public class AdjustStreamThreadCountTest { @Override public void init(final ProcessorContext context) { context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> { - if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) { + if (Thread.currentThread().getName().contains("StreamThread-1") && injectError.get()) { injectError.set(false); throw new RuntimeException("BOOM"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 3ebcbdebef5..e8e37fca433 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; @@ -149,18 +150,24 @@ public class EosIntegrationTest { private String stateTmpDir; @SuppressWarnings("deprecation") - @Parameters(name = "{0}") - public static Collection data() { - return Arrays.asList(new String[][]{ - {StreamsConfig.AT_LEAST_ONCE}, - {StreamsConfig.EXACTLY_ONCE}, - {StreamsConfig.EXACTLY_ONCE_V2} + @Parameters(name = "{0}, processing threads = {1}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {StreamsConfig.AT_LEAST_ONCE, false}, + {StreamsConfig.EXACTLY_ONCE, false}, + {StreamsConfig.EXACTLY_ONCE_V2, false}, + {StreamsConfig.AT_LEAST_ONCE, true}, + {StreamsConfig.EXACTLY_ONCE, true}, + {StreamsConfig.EXACTLY_ONCE_V2, true} }); } - @Parameter + @Parameter(0) public String eosConfig; + @Parameter(1) + public boolean processingThreadsEnabled; + @Before public void createTopics() throws Exception { applicationId = "appId-" + TEST_NUMBER.getAndIncrement(); @@ -876,10 +883,15 @@ public class EosIntegrationTest { LOG.info(dummyHostName + " is executing the injected stall"); stallingHost.set(dummyHostName); while (doStall) { - final StreamThread thread = (StreamThread) Thread.currentThread(); - if (thread.isInterrupted() || !thread.isRunning()) { + final Thread thread = Thread.currentThread(); + if (thread.isInterrupted()) { throw new RuntimeException("Detected we've been interrupted."); } + if (!processingThreadsEnabled) { + if (!((StreamThread) thread).isRunning()) { + throw new RuntimeException("Detected we've been interrupted."); + } + } try { Thread.sleep(100); } catch (final InterruptedException e) { @@ -943,6 +955,7 @@ public class EosIntegrationTest { properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142"); + properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); final Properties config = StreamsTestUtils.getStreamsConfig( applicationId, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 73cc57834d3..0ed52cfbb8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; @@ -94,11 +95,12 @@ public class SmokeTestDriverIntegrationTest { } - private static Stream parameters() { + private static Stream parameters() { return Stream.of( - Boolean.TRUE, - Boolean.FALSE - ); + Arguments.of(false, false), + Arguments.of(true, false), + Arguments.of(true, true) + ); } // In this test, we try to keep creating new stream, and closing the old one, to maintain only 3 streams alive. @@ -107,7 +109,7 @@ public class SmokeTestDriverIntegrationTest { // (1) 10 min timeout, (2) 30 tries of polling without getting any data @ParameterizedTest @MethodSource("parameters") - public void shouldWorkWithRebalance(final boolean stateUpdaterEnabled) throws InterruptedException { + public void shouldWorkWithRebalance(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { Exit.setExitProcedure((statusCode, message) -> { throw new AssertionError("Test called exit(). code:" + statusCode + " message:" + message); }); @@ -128,6 +130,7 @@ public class SmokeTestDriverIntegrationTest { final Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); + props.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); // decrease the session timeout so that we can trigger the rebalance soon after old client left closed props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index ffdd0699c7a..f3950ae84f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -497,6 +497,7 @@ public class ActiveTaskCreatorTest { "clientId-StreamThread-0", uuid, new LogContext().logger(ActiveTaskCreator.class), + false, false); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index ffc8524d655..ecdf5b7fff0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.processor.internals.AbstractPartitionGroup.RecordInfo; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -133,7 +134,7 @@ public class PartitionGroupTest { private void testFirstBatch(final PartitionGroup group) { StampedRecord record; - final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); + final PartitionGroup.RecordInfo info = new RecordInfo(); assertThat(group.numBuffered(), is(0)); // add three 3 records with timestamp 1, 3, 5 to partition-1 @@ -193,7 +194,7 @@ public class PartitionGroupTest { private void testSecondBatch(final PartitionGroup group) { StampedRecord record; - final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); + final PartitionGroup.RecordInfo info = new RecordInfo(); // add 2 more records with timestamp 2, 4 to partition-1 final List> list3 = Arrays.asList( @@ -316,7 +317,7 @@ public class PartitionGroupTest { assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; - final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); + final PartitionGroup.RecordInfo info = new RecordInfo(); // get first two records from partition 1 record = group.nextRecord(info, time.milliseconds()); @@ -445,15 +446,16 @@ public class PartitionGroupTest { new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); group.addRawRecords(partition1, list); - group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); - group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); + group.nextRecord(new RecordInfo(), time.milliseconds()); + group.nextRecord(new RecordInfo(), time.milliseconds()); group.updateLags(); + group.clear(); assertThat(group.numBuffered(), equalTo(0)); assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN)); - assertThat(group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()), equalTo(null)); + assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), equalTo(null)); assertThat(group.partitionTimestamp(partition1), equalTo(RecordQueue.UNKNOWN)); hasNoFetchedLag(group, partition1); @@ -475,7 +477,7 @@ public class PartitionGroupTest { group.addRawRecords(partition2, list2); assertEquals(list1.size() + list2.size(), group.numBuffered()); assertTrue(group.allPartitionsBufferedLocally()); - group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); + group.nextRecord(new RecordInfo(), time.milliseconds()); // shrink list of queues group.updatePartitions(mkSet(createPartition2()), p -> { @@ -487,7 +489,7 @@ public class PartitionGroupTest { assertEquals(list2.size(), group.numBuffered()); assertEquals(1, group.streamTime()); assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(partition1)); - assertThat(group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()), notNullValue()); // can access buffered records + assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), notNullValue()); // can access buffered records assertThat(group.partitionTimestamp(partition2), equalTo(2L)); } @@ -508,7 +510,7 @@ public class PartitionGroupTest { assertEquals(list1.size(), group.numBuffered()); assertTrue(group.allPartitionsBufferedLocally()); - group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); + group.nextRecord(new RecordInfo(), time.milliseconds()); // expand list of queues group.updatePartitions(mkSet(createPartition1(), createPartition2()), p -> { @@ -521,7 +523,7 @@ public class PartitionGroupTest { assertEquals(1, group.streamTime()); assertThat(group.partitionTimestamp(partition1), equalTo(1L)); assertThat(group.partitionTimestamp(partition2), equalTo(RecordQueue.UNKNOWN)); - assertThat(group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()), notNullValue()); // can access buffered records + assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), notNullValue()); // can access buffered records } @Test @@ -540,7 +542,7 @@ public class PartitionGroupTest { group.addRawRecords(partition1, list1); assertEquals(list1.size(), group.numBuffered()); assertTrue(group.allPartitionsBufferedLocally()); - group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); + group.nextRecord(new RecordInfo(), time.milliseconds()); // expand and shrink list of queues group.updatePartitions(mkSet(createPartition2()), p -> { @@ -553,7 +555,7 @@ public class PartitionGroupTest { assertEquals(1, group.streamTime()); assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(partition1)); assertThat(group.partitionTimestamp(partition2), equalTo(RecordQueue.UNKNOWN)); - assertThat(group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()), nullValue()); // all available records removed + assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), nullValue()); // all available records removed } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index e295d29e419..1c91ad65b13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1841,7 +1841,9 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext); + logContext, + false + ); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); @@ -2504,7 +2506,9 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext) + logContext, + false + ) ); assertThat(exception.getMessage(), equalTo("Invalid topology: " + @@ -2700,7 +2704,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2741,7 +2746,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2774,7 +2780,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2812,7 +2819,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2852,7 +2860,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2893,7 +2902,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2932,7 +2942,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } @@ -2966,7 +2977,8 @@ public class StreamTaskTest { stateManager, recordCollector, context, - logContext + logContext, + false ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index f9c2a0a6080..e0990174168 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -75,6 +76,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.StreamThread.State; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -85,16 +87,20 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; +import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.InOrder; -import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import java.io.File; @@ -131,6 +137,8 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; @@ -153,12 +161,26 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(Parameterized.class) public class StreamThreadTest { + @Parameter(0) + public boolean stateUpdaterEnabled = true; + + @Parameter(1) + public boolean processingThreadsEnabled = true; + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {false, false}, {true, false}, {true, true} + }); + } + private final static String APPLICATION_ID = "stream-thread-test"; private final static UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d"); private final static String CLIENT_ID = APPLICATION_ID + "-" + PROCESS_ID; @@ -170,18 +192,17 @@ public class StreamThreadTest { private final MockTime mockTime = new MockTime(); private final String stateDir = TestUtils.tempDirectory().getPath(); private final MockClientSupplier clientSupplier = new MockClientSupplier(); - private final StreamsConfig config = new StreamsConfig(configProps(false)); - private final StreamsConfig eosEnabledConfig = new StreamsConfig(configProps(true)); private final ConsumedInternal consumed = new ConsumedInternal<>(); private final ChangelogReader changelogReader = new MockChangelogReader(); - private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true, false); + private StateDirectory stateDirectory = null; private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder(); private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); - @Mock - private Consumer mainConsumer; + private StreamThread thread = null; + + @SuppressWarnings("unchecked") + private final Consumer mainConsumer = (Consumer) Mockito.mock(Consumer.class); - private StreamsMetadataState streamsMetadataState; private final static BiConsumer HANDLER = (e, b) -> { if (e instanceof RuntimeException) { throw (RuntimeException) e; @@ -196,11 +217,15 @@ public class StreamThreadTest { public void setUp() { Thread.currentThread().setName(CLIENT_ID + "-StreamThread-" + threadIdx); internalTopologyBuilder.setApplicationId(APPLICATION_ID); - streamsMetadataState = new StreamsMetadataState( - new TopologyMetadata(internalTopologyBuilder, config), - StreamsMetadataState.UNKNOWN_HOST, - new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) - ); + } + + @After + public void tearDown() { + if (thread != null) { + thread.shutdown(); + thread = null; + } + stateDirectory = null; } private final String topic1 = "topic1"; @@ -224,7 +249,9 @@ public class StreamThreadTest { mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()) + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), + mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(stateUpdaterEnabled)), + mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)) )); } @@ -240,17 +267,25 @@ public class StreamThreadTest { ); } + private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId) { + return createStreamThread(clientId, mockTime); + } + private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, - final StreamsConfig config, - final boolean eosEnabled) { - return createStreamThread(clientId, config, mockTime, eosEnabled); + final Time time) { + final StreamsConfig config = new StreamsConfig(configProps(false)); + return createStreamThread(clientId, config, time); + } + + private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, + final StreamsConfig config) { + return createStreamThread(clientId, config, mockTime); } private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, final StreamsConfig config, - final Time time, - final boolean eosEnabled) { - if (eosEnabled) { + final Time time) { + if (!StreamsConfig.AT_LEAST_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { clientSupplier.setApplicationIdForProducer(APPLICATION_ID); } @@ -266,6 +301,12 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); + stateDirectory = new StateDirectory(config, mockTime, true, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); return StreamThread.create( topologyMetadata, config, @@ -307,7 +348,7 @@ public class StreamThreadTest { @Test public void shouldChangeStateInRebalanceListener() { - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + thread = createStreamThread(CLIENT_ID); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -332,7 +373,7 @@ public class StreamThreadTest { mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertEquals(thread.state(), StreamThread.State.RUNNING); Assert.assertEquals(4, stateListener.numChanges); Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState); @@ -343,7 +384,7 @@ public class StreamThreadTest { @Test public void shouldChangeStateAtStartClose() throws Exception { - final StreamThread thread = createStreamThread(CLIENT_ID, config, new MockTime(1), false); + thread = createStreamThread(CLIENT_ID, new MockTime(1)); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -366,7 +407,7 @@ public class StreamThreadTest { @Test public void shouldCreateMetricsAtStartup() { - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + thread = createStreamThread(CLIENT_ID, new MockTime(1)); final String defaultGroupName = "stream-thread-metrics"; final Map defaultTags = Collections.singletonMap( "thread-id", @@ -480,7 +521,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); mockTime.sleep(commitInterval - 10L); @@ -510,7 +551,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); mockTime.sleep(purgeInterval - 10L); @@ -544,7 +585,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.setNow(mockTime.milliseconds()); mockTime.sleep(purgeInterval + 10L); thread.maybeCommit(); @@ -554,6 +595,8 @@ public class StreamThreadTest { @Test public void shouldNotProcessWhenPartitionRevoked() { + Assume.assumeFalse(processingThreadsEnabled); + final Properties props = configProps(false); final StreamsConfig config = new StreamsConfig(props); @@ -565,16 +608,17 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_REVOKED); - thread.runOnce(); + thread.runOnceWithoutProcessingThreads(); Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(), Mockito.any()); } @Test public void shouldProcessWhenRunning() { + Assume.assumeFalse(processingThreadsEnabled); final Properties props = configProps(false); final StreamsConfig config = new StreamsConfig(props); @@ -586,20 +630,21 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); thread.updateThreadMetadata("admin"); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); thread.setState(State.RUNNING); - thread.runOnce(); + runOnce(); Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } @Test public void shouldProcessWhenPartitionAssigned() { + Assume.assumeTrue(stateUpdaterEnabled); + Assume.assumeFalse(processingThreadsEnabled); final Properties props = configProps(false); - props.setProperty(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(true)); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -610,17 +655,19 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); thread.updateThreadMetadata("admin"); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); - thread.runOnce(); + runOnce(); Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } @Test public void shouldProcessWhenStarting() { + Assume.assumeTrue(stateUpdaterEnabled); + Assume.assumeFalse(processingThreadsEnabled); final Properties props = configProps(false); props.setProperty(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(true)); @@ -633,10 +680,10 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); thread.updateThreadMetadata("admin"); thread.setState(State.STARTING); - thread.runOnce(); + runOnce(); Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } @@ -663,7 +710,13 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = StreamThread.create( + stateDirectory = new StateDirectory(config, mockTime, true, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); + thread = StreamThread.create( topologyMetadata, config, mockClientSupplier, @@ -725,7 +778,13 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = StreamThread.create( + stateDirectory = new StateDirectory(config, mockTime, true, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); + thread = StreamThread.create( topologyMetadata, config, mockClientSupplier, @@ -794,7 +853,10 @@ public class StreamThreadTest { } @Test - public void shouldRespectNumIterationsInMainLoop() { + public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads() { + // With processing threads, there is no guarantee how many iterations will be performed + Assume.assumeFalse(processingThreadsEnabled); + final List> mockProcessors = new LinkedList<>(); internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalTopologyBuilder.addProcessor( @@ -813,13 +875,15 @@ public class StreamThreadTest { ); final Properties properties = new Properties(); + properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); + properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID, "localhost:2171", Serdes.ByteArraySerde.class.getName(), Serdes.ByteArraySerde.class.getName(), properties)); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -833,48 +897,48 @@ public class StreamThreadTest { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); // processed one record, punctuated after the first record, and hence num.iterations is still 1 long offset = -1; addRecord(mockConsumer, ++offset, 0L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(1)); // processed one more record without punctuation, and bump num.iterations to 2 addRecord(mockConsumer, ++offset, 1L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(2)); // processed zero records, early exit and iterations stays as 2 - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(2)); // system time based punctutation without processing any record, iteration stays as 2 mockTime.sleep(11L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(2)); // system time based punctutation after processing a record, half iteration to 1 mockTime.sleep(11L); addRecord(mockConsumer, ++offset, 5L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(1)); // processed two records, bumping up iterations to 3 (1 + 2) addRecord(mockConsumer, ++offset, 5L); addRecord(mockConsumer, ++offset, 6L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(3)); // stream time based punctutation halves to 1 addRecord(mockConsumer, ++offset, 11L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(1)); @@ -882,13 +946,13 @@ public class StreamThreadTest { addRecord(mockConsumer, ++offset, 12L); addRecord(mockConsumer, ++offset, 13L); addRecord(mockConsumer, ++offset, 14L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(3)); mockProcessors.forEach(MockApiProcessor::requestCommit); addRecord(mockConsumer, ++offset, 15L); - thread.runOnce(); + runOnce(); // user requested commit should half iteration to 1 assertThat(thread.currentNumIterations(), equalTo(1)); @@ -897,20 +961,20 @@ public class StreamThreadTest { addRecord(mockConsumer, ++offset, 15L); addRecord(mockConsumer, ++offset, 16L); addRecord(mockConsumer, ++offset, 17L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(3)); // time based commit without processing, should keep the iteration as 3 mockTime.sleep(90L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(3)); // time based commit without processing, should half the iteration to 1 mockTime.sleep(90L); addRecord(mockConsumer, ++offset, 18L); - thread.runOnce(); + runOnce(); assertThat(thread.currentNumIterations(), equalTo(1)); } @@ -933,7 +997,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); mockTime.sleep(commitInterval - 10L); @@ -972,6 +1036,7 @@ public class StreamThreadTest { topologyMetadata, null, null, + null, null ) { @Override @@ -983,7 +1048,7 @@ public class StreamThreadTest { } }; topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); @@ -1026,7 +1091,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); @@ -1062,7 +1127,17 @@ public class StreamThreadTest { EasyMock.replay(consumer, consumerGroupMetadata, activeTaskCreator, standbyTaskCreator); final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class); + final DefaultTaskManager schedulingTaskManager; + if (processingThreadsEnabled) { + schedulingTaskManager = Mockito.mock(DefaultTaskManager.class); + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.complete(null); + when(schedulingTaskManager.lockTasks(any())).thenReturn(future); + } else { + schedulingTaskManager = null; + } + final StreamsConfig config = new StreamsConfig(configProps(false)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); @@ -1079,7 +1154,8 @@ public class StreamThreadTest { topologyMetadata, null, null, - stateUpdater + stateUpdater, + schedulingTaskManager ) { @Override int commit(final Collection tasksToCommit) { @@ -1089,7 +1165,7 @@ public class StreamThreadTest { }; taskManager.setMainConsumer(consumer); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread.updateThreadMetadata("adminClientId"); thread.setState(StreamThread.State.STARTING); @@ -1119,7 +1195,7 @@ public class StreamThreadTest { ) ); - thread.runOnce(); + runOnce(); assertThat( streamsMetrics.metrics().get( @@ -1150,7 +1226,8 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final StreamsConfig config = new StreamsConfig(configProps(false)); + thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); @@ -1190,7 +1267,7 @@ public class StreamThreadTest { final Properties props = configProps(true); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(props), true); + thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); thread.setState(StreamThread.State.STARTING); thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); @@ -1214,7 +1291,7 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); - thread.runOnce(); + runOnce(); assertEquals(thread.readOnlyActiveTasks().size(), clientSupplier.producers.size()); assertSame(clientSupplier.consumer, thread.mainConsumer()); @@ -1226,7 +1303,7 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final Properties props = configProps(true); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(props), true); + thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); thread.setState(StreamThread.State.STARTING); thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); @@ -1250,7 +1327,7 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); - thread.runOnce(); + runOnce(); assertThat(clientSupplier.producers.size(), is(1)); assertSame(clientSupplier.consumer, thread.mainConsumer()); @@ -1259,19 +1336,18 @@ public class StreamThreadTest { @Test public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException { - internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - - final Properties props = configProps(true); - // The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume() // is not called. This is not true when the state updater is enabled which leads to // java.lang.IllegalStateException: No current assignment for partition topic1-2. // Since this tests verifies an aspect that is independent from the state updater, it is OK to disable // the state updater and leave the rewriting of the test to later, when the code path for disabled state updater // is removed. - props.put(InternalConfig.STATE_UPDATER_ENABLED, false); - final StreamThread thread = - createStreamThread(CLIENT_ID, new StreamsConfig(props), new MockTime(1), true); + Assume.assumeFalse(stateUpdaterEnabled); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); + + final Properties props = configProps(true); + thread = + createStreamThread(CLIENT_ID, new StreamsConfig(props), new MockTime(1)); thread.taskManager().handleRebalanceStart(Collections.singleton(topic1)); @@ -1315,11 +1391,10 @@ public class StreamThreadTest { public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final StreamThread thread = createStreamThread( + thread = createStreamThread( CLIENT_ID, new StreamsConfig(configProps(true)), - new MockTime(1), - true + new MockTime(1) ); thread.start(); @@ -1366,11 +1441,12 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); + final StreamsConfig config = new StreamsConfig(configProps(false)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.setStateListener( (t, newState, oldState) -> { @@ -1383,16 +1459,7 @@ public class StreamThreadTest { } @Test - public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterEnabled() { - shouldNotReturnDataAfterTaskMigrated(true); - } - - @Test - public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterDisabled() { - shouldNotReturnDataAfterTaskMigrated(false); - } - - private void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabled) { + public void shouldNotReturnDataAfterTaskMigrated() { final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); expect(taskManager.allOwnedTasks()).andStubReturn(emptyMap()); @@ -1437,9 +1504,8 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final Properties props = configProps(false); - props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); final StreamsConfig config = new StreamsConfig(props); - final StreamThread thread = new StreamThread( + thread = new StreamThread( new MockTime(1), config, null, @@ -1483,11 +1549,12 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); + final StreamsConfig config = new StreamsConfig(configProps(false)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.shutdown(); verify(taskManager); @@ -1506,11 +1573,12 @@ public class StreamThreadTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); + final StreamsConfig config = new StreamsConfig(configProps(false)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.shutdown(); // Execute the run method. Verification of the mock will check that shutdown was only done once @@ -1523,7 +1591,8 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final StreamsConfig config = new StreamsConfig(configProps(false)); + thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); @@ -1543,7 +1612,8 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source"); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); + final StreamsConfig config = new StreamsConfig(configProps(true)); + thread = createStreamThread(CLIENT_ID, config); final MockConsumer consumer = clientSupplier.consumer; @@ -1566,7 +1636,7 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); @@ -1576,9 +1646,13 @@ public class StreamThreadTest { consumer.assign(new HashSet<>(assignedPartitions)); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); - mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); - thread.runOnce(); - assertThat(producer.history().size(), equalTo(1)); + if (processingThreadsEnabled) { + assertTrue(runUntilTimeoutOrCondition(this::runOnce, () -> !producer.history().isEmpty())); + } else { + mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); + runOnce(); + assertThat(producer.history().size(), equalTo(1)); + } mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); TestUtils.waitForCondition( @@ -1588,11 +1662,16 @@ public class StreamThreadTest { producer.fenceProducer(); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new byte[0])); + try { - thread.runOnce(); + if (processingThreadsEnabled) { + runUntilTimeoutOrException(this::runOnce); + } else { + runOnce(); + } fail("Should have thrown TaskMigratedException"); } catch (final KafkaException expected) { - assertTrue(expected instanceof TaskMigratedException); + assertTrue(String.format("Expected TaskMigratedException but got %s", expected), expected instanceof TaskMigratedException); assertTrue("StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.", thread.readOnlyActiveTasks().stream().anyMatch(task -> task.id().equals(task1))); } @@ -1601,8 +1680,9 @@ public class StreamThreadTest { } @Test - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() { - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(true)); + thread = createStreamThread(CLIENT_ID, config); internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); @@ -1624,36 +1704,29 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); // need to process a record to enable committing addRecord(mockConsumer, 0L); - thread.runOnce(); + final MockProducer producer = clientSupplier.producers.get(0); + runOnce(); + if (processingThreadsEnabled) { + waitForCondition(() -> !producer.uncommittedRecords().isEmpty(), "Processing threads to process record"); + } - clientSupplier.producers.get(0).commitTransactionException = new ProducerFencedException("Producer is fenced"); + producer.commitTransactionException = new ProducerFencedException("Producer is fenced"); assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener().onPartitionsRevoked(assignedPartitions)); - assertFalse(clientSupplier.producers.get(0).transactionCommitted()); - assertFalse(clientSupplier.producers.get(0).closed()); + assertFalse(producer.transactionCommitted()); + assertFalse(producer.closed()); assertEquals(1, thread.readOnlyActiveTasks().size()); } @Test - public void shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterEnabled() throws Exception { - shouldReinitializeRevivedTasksInAnyState(true); - } - - @Test - public void shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterDisabled() throws Exception { - shouldReinitializeRevivedTasksInAnyState(false); - } - - private void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterEnabled) throws Exception { - final Properties streamsConfigProps = configProps(false); - streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); - final StreamsConfig config = new StreamsConfig(streamsConfigProps); - final StreamThread thread = createStreamThread(CLIENT_ID, config, new MockTime(1), false); + public void shouldReinitializeRevivedTasksInAnyState() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(false)); + thread = createStreamThread(CLIENT_ID, config, new MockTime(1)); final String storeName = "store"; final String storeChangelog = "stream-thread-test-store-changelog"; @@ -1713,11 +1786,11 @@ public class StreamThreadTest { // the first iteration completes the restoration - thread.runOnce(); + runOnce(); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); // the second transits to running and unpause the input - thread.runOnce(); + runOnce(); // the third actually polls, processes the record, and throws the corruption exception if (stateUpdaterEnabled) { @@ -1731,7 +1804,12 @@ public class StreamThreadTest { } addRecord(mockConsumer, 0L); shouldThrow.set(true); - final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, thread::runOnce); + final TaskCorruptedException taskCorruptedException; + if (processingThreadsEnabled) { + taskCorruptedException = assertThrows(TaskCorruptedException.class, () -> runUntilTimeoutOrException(this::runOnce)); + } else { + taskCorruptedException = assertThrows(TaskCorruptedException.class, this::runOnce); + } // Now, we can handle the corruption thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks()); @@ -1746,16 +1824,20 @@ public class StreamThreadTest { } // again, complete the restoration - thread.runOnce(); + runOnce(); // transit to running and unpause - thread.runOnce(); + runOnce(); // process the record addRecord(mockConsumer, 0L); shouldThrow.set(false); assertThat(processed.get(), is(false)); - thread.runOnce(); - assertThat(processed.get(), is(true)); - thread.taskManager().shutdown(true); + if (processingThreadsEnabled) { + assertTrue(runUntilTimeoutOrCondition(this::runOnce, processed::get)); + } else { + runOnce(); + runOnce(); + assertThat(processed.get(), is(true)); + } } @Test @@ -1763,7 +1845,8 @@ public class StreamThreadTest { // only have source but no sink so that we would not get fenced in producer.send internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); + final StreamsConfig config = new StreamsConfig(configProps(true)); + thread = createStreamThread(CLIENT_ID, config); final MockConsumer consumer = clientSupplier.consumer; @@ -1786,7 +1869,7 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); @@ -1794,7 +1877,11 @@ public class StreamThreadTest { mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new byte[0])); try { - thread.runOnce(); + if (processingThreadsEnabled) { + runUntilTimeoutOrException(this::runOnce); + } else { + runOnce(); + } fail("Should have thrown TaskMigratedException"); } catch (final KafkaException expected) { assertTrue(expected instanceof TaskMigratedException); @@ -1811,8 +1898,9 @@ public class StreamThreadTest { } @Test - public void shouldNotCloseTaskProducerWhenSuspending() { - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); + public void shouldNotCloseTaskProducerWhenSuspending() throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(true)); + thread = createStreamThread(CLIENT_ID, config); internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); @@ -1834,22 +1922,30 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); // need to process a record to enable committing addRecord(mockConsumer, 0L); - thread.runOnce(); + final MockProducer producer = clientSupplier.producers.get(0); + + if (processingThreadsEnabled) { + assertTrue(runUntilTimeoutOrCondition(this::runOnce, () -> !producer.history().isEmpty())); + } else { + runOnce(); + } thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); - assertTrue(clientSupplier.producers.get(0).transactionCommitted()); - assertFalse(clientSupplier.producers.get(0).closed()); + assertTrue(producer.transactionCommitted()); + assertTrue(producer.transactionCommitted()); + assertFalse(producer.closed()); assertEquals(1, thread.readOnlyActiveTasks().size()); } @Test public void shouldReturnActiveTaskMetadataWhileRunningState() { + final StreamsConfig config = new StreamsConfig(configProps(false)); internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); clientSupplier.setCluster(createCluster()); @@ -1864,7 +1960,13 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = StreamThread.create( + stateDirectory = new StateDirectory(config, mockTime, true, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); + thread = StreamThread.create( topologyMetadata, config, clientSupplier, @@ -1899,7 +2001,7 @@ public class StreamThreadTest { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); final ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState()); @@ -1918,11 +2020,12 @@ public class StreamThreadTest { @Test public void shouldReturnStandbyTaskMetadataWhileRunningState() { + final StreamsConfig config = new StreamsConfig(configProps(false)); internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) .groupByKey().count(Materialized.as("count-one")); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamThread thread = createStreamThread(CLIENT_ID, config, new MockTime(1), false); + thread = createStreamThread(CLIENT_ID, config, new MockTime(1)); final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions( STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, @@ -1952,33 +2055,32 @@ public class StreamThreadTest { thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); - thread.runOnce(); + runOnce(); final ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadataImpl(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); assertTrue(threadMetadata.activeTasks().isEmpty()); - - thread.taskManager().shutdown(true); } @SuppressWarnings("unchecked") @Test public void shouldUpdateStandbyTask() throws Exception { + // Updating standby tasks on the stream thread only happens when the state updater is disabled + Assume.assumeFalse(stateUpdaterEnabled); + final String storeName1 = "count-one"; final String storeName2 = "table-two"; final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; final Properties props = configProps(false); - // Updating standby tasks on the stream thread only happens when the state updater is disabled - props.put(InternalConfig.STATE_UPDATER_ENABLED, false); final StreamsConfig config = new StreamsConfig(props); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + thread = createStreamThread(CLIENT_ID, config); final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; - setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, false); + setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, false); - thread.runOnce(); + runOnce(); final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); @@ -1993,12 +2095,10 @@ public class StreamThreadTest { addStandbyRecordsToRestoreConsumer(restoreConsumer); - thread.runOnce(); + runOnce(); assertEquals(10L, store1.approximateNumEntries()); assertEquals(4L, store2.approximateNumEntries()); - - thread.taskManager().shutdown(true); } private void addActiveRecordsToRestoreConsumer(final MockConsumer restoreConsumer) { @@ -2034,7 +2134,6 @@ public class StreamThreadTest { final String storeName2, final String changelogName1, final String changelogName2, - final StreamThread thread, final MockConsumer restoreConsumer, final boolean addActiveTask) throws IOException { final TopicPartition activePartition = new TopicPartition(changelogName1, 2); @@ -2090,20 +2189,20 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception { + // Updating standby tasks on the stream thread only happens when the state updater is disabled + Assume.assumeFalse(stateUpdaterEnabled); + + final StreamsConfig config = new StreamsConfig(configProps(false)); final String storeName1 = "count-one"; final String storeName2 = "table-two"; final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; - final Properties props = configProps(false); - // Updating standby tasks on the stream thread only happens when the state updater is disabled - props.put(InternalConfig.STATE_UPDATER_ENABLED, false); - final StreamsConfig config = new StreamsConfig(props); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + thread = createStreamThread(CLIENT_ID, config); final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; - setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, true); + setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, true); - thread.runOnce(); + runOnce(); final StreamTask activeTask1 = activeTask(thread.taskManager(), t1p2); final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); @@ -2127,7 +2226,7 @@ public class StreamThreadTest { // Simulate pause thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY); - thread.runOnce(); + runOnce(); assertEquals(0L, activeStore.approximateNumEntries()); assertEquals(0L, store1.approximateNumEntries()); @@ -2135,49 +2234,52 @@ public class StreamThreadTest { // Simulate resume thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY); - thread.runOnce(); + runOnce(); assertEquals(10L, activeStore.approximateNumEntries()); assertEquals(0L, store1.approximateNumEntries()); assertEquals(0L, store2.approximateNumEntries()); - thread.runOnce(); + runOnce(); assertEquals(10L, activeStore.approximateNumEntries()); assertEquals(10L, store1.approximateNumEntries()); assertEquals(4L, store2.approximateNumEntries()); - - thread.taskManager().shutdown(true); } @Test public void shouldCreateStandbyTask() { - setupInternalTopologyWithoutState(); + final StreamsConfig config = new StreamsConfig(configProps(false)); + setupInternalTopologyWithoutState(config); internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), "processor1"); - assertThat(createStandbyTask(), not(empty())); + assertThat(createStandbyTask(config), not(empty())); } @Test public void shouldNotCreateStandbyTaskWithoutStateStores() { - setupInternalTopologyWithoutState(); + final StreamsConfig config = new StreamsConfig(configProps(false)); + setupInternalTopologyWithoutState(config); - assertThat(createStandbyTask(), empty()); + assertThat(createStandbyTask(config), empty()); } @Test public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() { - setupInternalTopologyWithoutState(); + final StreamsConfig config = new StreamsConfig(configProps(false)); + setupInternalTopologyWithoutState(config); final StoreBuilder> storeBuilder = new MockKeyValueStoreBuilder("myStore", true); storeBuilder.withLoggingDisabled(); internalTopologyBuilder.addStateStore(storeBuilder, "processor1"); - assertThat(createStandbyTask(), empty()); + assertThat(createStandbyTask(config), empty()); } @Test @SuppressWarnings("deprecation") public void shouldPunctuateActiveTask() { + Assume.assumeFalse(processingThreadsEnabled); + final List punctuatedStreamTime = new ArrayList<>(); final List punctuatedWallClockTime = new ArrayList<>(); final ProcessorSupplier punctuateProcessor = @@ -2195,7 +2297,8 @@ public class StreamThreadTest { internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final StreamsConfig config = new StreamsConfig(configProps(false)); + thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); @@ -2213,7 +2316,7 @@ public class StreamThreadTest { clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertEquals(0, punctuatedStreamTime.size()); assertEquals(0, punctuatedWallClockTime.size()); @@ -2232,14 +2335,14 @@ public class StreamThreadTest { new RecordHeaders(), Optional.empty())); - thread.runOnce(); + runOnce(); assertEquals(1, punctuatedStreamTime.size()); assertEquals(1, punctuatedWallClockTime.size()); mockTime.sleep(100L); - thread.runOnce(); + runOnce(); // we should skip stream time punctuation, only trigger wall-clock time punctuation assertEquals(1, punctuatedStreamTime.size()); @@ -2249,6 +2352,8 @@ public class StreamThreadTest { @Test @SuppressWarnings("deprecation") public void shouldPunctuateWithTimestampPreservedInProcessorContext() { + Assume.assumeFalse(processingThreadsEnabled); + final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor = () -> new org.apache.kafka.streams.kstream.Transformer>() { @Override @@ -2276,7 +2381,7 @@ public class StreamThreadTest { internalStreamsBuilder.buildAndOptimizeTopology(); final long currTime = mockTime.milliseconds(); - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + thread = createStreamThread(CLIENT_ID); thread.setState(StreamThread.State.STARTING); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); @@ -2294,11 +2399,11 @@ public class StreamThreadTest { clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); assertEquals(0, peekedContextTime.size()); mockTime.sleep(100L); - thread.runOnce(); + runOnce(); assertEquals(1, peekedContextTime.size()); assertEquals(currTime + 100L, peekedContextTime.get(0).longValue()); @@ -2316,7 +2421,7 @@ public class StreamThreadTest { new RecordHeaders(), Optional.empty())); - thread.runOnce(); + runOnce(); assertEquals(2, peekedContextTime.size()); assertEquals(110L, peekedContextTime.get(1).longValue()); @@ -2324,7 +2429,8 @@ public class StreamThreadTest { @Test public void shouldAlwaysUpdateTasksMetadataAfterChangingState() { - final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final StreamsConfig config = new StreamsConfig(configProps(false)); + thread = createStreamThread(CLIENT_ID, config); ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.CREATED.name(), metadata.threadState()); @@ -2337,25 +2443,13 @@ public class StreamThreadTest { } @Test - public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterEnabled() throws Exception { - shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(true); - } - - @Test - public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterDiabled() throws Exception { - shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(false); - } - - private void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean stateUpdaterEnabled) throws Exception { + public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception { internalStreamsBuilder.stream(Collections.singleton("topic"), consumed) .groupByKey() .count(Materialized.as("count")); internalStreamsBuilder.buildAndOptimizeTopology(); - final Properties props = configProps(false); - props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); - final StreamsConfig config = new StreamsConfig(props); - final StreamThread thread = createStreamThread("clientId", config, new MockTime(1), false); + thread = createStreamThread("clientId", new MockTime(1)); final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer(); final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient(); @@ -2408,71 +2502,66 @@ public class StreamThreadTest { thread.rebalanceListener().onPartitionsAssigned(topicPartitionSet); }); - try { - thread.start(); + thread.start(); - TestUtils.waitForCondition( - () -> mockRestoreConsumer.assignment().size() == 1, - "Never get the assignment"); + TestUtils.waitForCondition( + () -> mockRestoreConsumer.assignment().size() == 1, + "Never get the assignment"); - mockRestoreConsumer.addRecord(new ConsumerRecord<>( - "stream-thread-test-count-changelog", - 0, - 0L, - "K1".getBytes(), - "V1".getBytes())); + mockRestoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-changelog", + 0, + 0L, + "K1".getBytes(), + "V1".getBytes())); - TestUtils.waitForCondition( - () -> mockRestoreConsumer.position(changelogPartition) == 1L, - "Never restore first record"); + TestUtils.waitForCondition( + () -> mockRestoreConsumer.position(changelogPartition) == 1L, + "Never restore first record"); - mockRestoreConsumer.setPollException(new InvalidOffsetException("Try Again!") { - @Override - public Set partitions() { - return changelogPartitionSet; - } - }); + mockRestoreConsumer.setPollException(new InvalidOffsetException("Try Again!") { + @Override + public Set partitions() { + return changelogPartitionSet; + } + }); + + // after handling the exception and reviving the task, with the state updater the changelog topic is + // registered again with the changelog reader + TestUtils.waitForCondition( + () -> mockRestoreConsumer.assignment().size() == 1, + "Never get the assignment"); + + // after handling the exception and reviving the task, the position + // should be reset to the beginning. + TestUtils.waitForCondition( + () -> mockRestoreConsumer.position(changelogPartition) == 0L, + "Never restore first record"); - // after handling the exception and reviving the task, with the state updater the changelog topic is - // registered again with the changelog reader + mockRestoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-changelog", + 0, + 0L, + "K1".getBytes(), + "V1".getBytes())); + mockRestoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-changelog", + 0, + 1L, + "K2".getBytes(), + "V2".getBytes())); + + if (stateUpdaterEnabled) { TestUtils.waitForCondition( - () -> mockRestoreConsumer.assignment().size() == 1, + () -> mockRestoreConsumer.assignment().size() == 0, "Never get the assignment"); - - // after handling the exception and reviving the task, the position - // should be reset to the beginning. + } else { TestUtils.waitForCondition( - () -> mockRestoreConsumer.position(changelogPartition) == 0L, - "Never restore first record"); - - mockRestoreConsumer.addRecord(new ConsumerRecord<>( - "stream-thread-test-count-changelog", - 0, - 0L, - "K1".getBytes(), - "V1".getBytes())); - mockRestoreConsumer.addRecord(new ConsumerRecord<>( - "stream-thread-test-count-changelog", - 0, - 1L, - "K2".getBytes(), - "V2".getBytes())); - - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> mockRestoreConsumer.assignment().size() == 0, - "Never get the assignment"); - } else { - TestUtils.waitForCondition( - () -> { - mockRestoreConsumer.assign(changelogPartitionSet); - return mockRestoreConsumer.position(changelogPartition) == 2L; - }, - "Never finished restore"); - } - } finally { - thread.shutdown(); - thread.join(10000); + () -> { + mockRestoreConsumer.assign(changelogPartitionSet); + return mockRestoreConsumer.position(changelogPartition) == 2L; + }, + "Never finished restore"); } } @@ -2480,13 +2569,13 @@ public class StreamThreadTest { public void shouldLogAndRecordSkippedMetricForDeserializationException() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties config = configProps(false); - config.setProperty( + final Properties properties = configProps(false); + properties.setProperty( StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName() ); - config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(config), false); + properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + thread = createStreamThread(CLIENT_ID, new StreamsConfig(properties)); thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -2501,7 +2590,7 @@ public class StreamThreadTest { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); long offset = -1; mockConsumer.addRecord(new ConsumerRecord<>( @@ -2530,7 +2619,7 @@ public class StreamThreadTest { Optional.empty())); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RecordDeserializer.class)) { - thread.runOnce(); + runOnce(); final List strings = appender.getMessages(); assertTrue(strings.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1]" + @@ -2542,6 +2631,7 @@ public class StreamThreadTest { @Test public void shouldThrowTaskMigratedExceptionHandlingTaskLost() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); @@ -2561,7 +2651,7 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); consumer.schedulePollTask(() -> { @@ -2570,11 +2660,12 @@ public class StreamThreadTest { }); thread.setState(StreamThread.State.STARTING); - assertThrows(TaskMigratedException.class, thread::runOnce); + assertThrows(TaskMigratedException.class, this::runOnce); } @Test public void shouldThrowTaskMigratedExceptionHandlingRevocation() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); @@ -2594,7 +2685,7 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); consumer.schedulePollTask(() -> { @@ -2603,12 +2694,13 @@ public class StreamThreadTest { }); thread.setState(StreamThread.State.STARTING); - assertThrows(TaskMigratedException.class, thread::runOnce); + assertThrows(TaskMigratedException.class, this::runOnce); } @Test @SuppressWarnings("unchecked") public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); final Consumer consumer = mock(Consumer.class); @@ -2640,13 +2732,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, config, null, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -2661,7 +2753,12 @@ public class StreamThreadTest { null ) { @Override - void runOnce() { + void runOnceWithProcessingThreads() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasks); + } + @Override + void runOnceWithoutProcessingThreads() { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } @@ -2675,6 +2772,7 @@ public class StreamThreadTest { @Test @SuppressWarnings("unchecked") public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); final Consumer consumer = mock(Consumer.class); @@ -2707,13 +2805,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, config, null, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -2728,7 +2826,12 @@ public class StreamThreadTest { null ) { @Override - void runOnce() { + void runOnceWithProcessingThreads() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasks); + } + @Override + void runOnceWithoutProcessingThreads() { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } @@ -2746,6 +2849,7 @@ public class StreamThreadTest { @Test @SuppressWarnings("unchecked") public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); final Consumer consumer = mock(Consumer.class); @@ -2782,13 +2886,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, config, null, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -2803,7 +2907,12 @@ public class StreamThreadTest { null ) { @Override - void runOnce() { + void runOnceWithProcessingThreads() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasks); + } + @Override + void runOnceWithoutProcessingThreads() { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } @@ -2818,6 +2927,8 @@ public class StreamThreadTest { @Test @SuppressWarnings("unchecked") public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() { + final StreamsConfig config = new StreamsConfig(configProps(true)); + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); final Consumer consumer = mock(Consumer.class); @@ -2852,13 +2963,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, - eosEnabledConfig, + config, null, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -2873,7 +2984,12 @@ public class StreamThreadTest { null ) { @Override - void runOnce() { + void runOnceWithProcessingThreads() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasks); + } + @Override + void runOnceWithoutProcessingThreads() { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } @@ -2889,6 +3005,8 @@ public class StreamThreadTest { @Test @SuppressWarnings("unchecked") public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() { + final StreamsConfig config = new StreamsConfig(configProps(true)); + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); final Consumer consumer = mock(Consumer.class); @@ -2920,13 +3038,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, - eosEnabledConfig, + config, null, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -2941,7 +3059,12 @@ public class StreamThreadTest { null ) { @Override - void runOnce() { + void runOnceWithProcessingThreads() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasks); + } + @Override + void runOnceWithoutProcessingThreads() { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } @@ -2956,6 +3079,7 @@ public class StreamThreadTest { @Test public void shouldNotCommitNonRunningNonRestoringTasks() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2990,7 +3114,7 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); EasyMock.replay(task1, task2, task3, taskManager); @@ -3004,12 +3128,13 @@ public class StreamThreadTest { public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties config = configProps(false); - config.setProperty( + final Properties properties = configProps(false); + properties.setProperty( StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName() ); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(config), false); + final StreamsConfig config = new StreamsConfig(properties); + thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -3026,7 +3151,7 @@ public class StreamThreadTest { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - thread.runOnce(); + runOnce(); final MetricName skippedTotalMetric = metrics.metricName( "skipped-records-total", @@ -3043,17 +3168,17 @@ public class StreamThreadTest { long offset = -1; addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); - thread.runOnce(); + runOnce(); addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); - thread.runOnce(); + runOnce(); addRecord(mockConsumer, ++offset, 1L); addRecord(mockConsumer, ++offset, 1L); - thread.runOnce(); + runOnce(); final List strings = appender.getMessages(); @@ -3093,6 +3218,7 @@ public class StreamThreadTest { @Test public void shouldTransmitTaskManagerMetrics() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); @@ -3104,7 +3230,7 @@ public class StreamThreadTest { final Metric testMetric = new KafkaMetric( new Object(), testMetricName, - (Measurable) (config, now) -> 0, + (Measurable) (c, now) -> 0, null, new MockTime()); final Map dummyProducerMetrics = singletonMap(testMetricName, testMetric); @@ -3116,13 +3242,14 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); assertThat(dummyProducerMetrics, is(thread.producerMetrics())); } @Test public void shouldConstructAdminMetrics() { + final StreamsConfig config = new StreamsConfig(configProps(false)); final Node broker1 = new Node(0, "dummyHost-1", 1234); final Node broker2 = new Node(1, "dummyHost-2", 1234); final List cluster = Arrays.asList(broker1, broker2); @@ -3141,13 +3268,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, config, adminClient, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -3165,7 +3292,7 @@ public class StreamThreadTest { final Metric testMetric = new KafkaMetric( new Object(), testMetricName, - (Measurable) (config, now) -> 0, + (Measurable) (c, now) -> 0, null, new MockTime()); @@ -3187,6 +3314,7 @@ public class StreamThreadTest { } public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail) { + final StreamsConfig config = new StreamsConfig(configProps(false)); final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); @@ -3198,13 +3326,13 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - final StreamThread thread = new StreamThread( + thread = new StreamThread( mockTime, config, null, consumer, consumer, - null, + changelogReader, null, taskManager, streamsMetrics, @@ -3219,7 +3347,14 @@ public class StreamThreadTest { null ) { @Override - void runOnce() { + void runOnceWithProcessingThreads() { + setState(StreamThread.State.PENDING_SHUTDOWN); + if (shouldFail) { + throw new StreamsException(Thread.currentThread().getName()); + } + } + @Override + void runOnceWithoutProcessingThreads() { setState(StreamThread.State.PENDING_SHUTDOWN); if (shouldFail) { throw new StreamsException(Thread.currentThread().getName()); @@ -3237,90 +3372,97 @@ public class StreamThreadTest { @Test public void shouldCheckStateUpdater() { - final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); - streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true); - final StreamThread streamThread = setUpThread(streamsConfigProps); - final TaskManager taskManager = streamThread.taskManager(); - streamThread.setState(State.STARTING); + Assume.assumeTrue(stateUpdaterEnabled); + final Properties streamsConfigProps = configProps(false); + thread = setUpThread(streamsConfigProps); + final TaskManager taskManager = thread.taskManager(); + thread.setState(State.STARTING); - streamThread.runOnce(); + runOnce(); Mockito.verify(taskManager).checkStateUpdater(Mockito.anyLong(), Mockito.any()); - Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); + + if (!processingThreadsEnabled) { + Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); + } } @Test public void shouldCheckStateUpdaterInBetweenProcessCalls() { - final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); - streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true); - final StreamThread streamThread = setUpThread(streamsConfigProps); - final TaskManager taskManager = streamThread.taskManager(); - streamThread.setState(State.STARTING); + Assume.assumeTrue(stateUpdaterEnabled); + Assume.assumeFalse(processingThreadsEnabled); + + final Properties streamsConfigProps = configProps(false); + thread = setUpThread(streamsConfigProps); + final TaskManager taskManager = thread.taskManager(); + thread.setState(State.STARTING); // non-zero return of process will cause a second call to process when(taskManager.process(Mockito.anyInt(), Mockito.any())).thenReturn(1).thenReturn(0); - streamThread.runOnce(); + runOnce(); Mockito.verify(taskManager, times(2)).checkStateUpdater(Mockito.anyLong(), Mockito.any()); } @Test public void shouldUpdateLagsAfterPolling() { - final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); - final StreamThread streamThread = setUpThread(streamsConfigProps); - streamThread.setState(State.STARTING); - streamThread.setState(State.PARTITIONS_ASSIGNED); + final Properties streamsConfigProps = configProps(false); + thread = setUpThread(streamsConfigProps); + thread.setState(State.STARTING); + thread.setState(State.PARTITIONS_ASSIGNED); - streamThread.runOnce(); + runOnce(); - final InOrder inOrder = Mockito.inOrder(mainConsumer, streamThread.taskManager()); + final InOrder inOrder = Mockito.inOrder(mainConsumer, thread.taskManager()); inOrder.verify(mainConsumer).poll(Mockito.any()); - inOrder.verify(streamThread.taskManager()).updateLags(); + inOrder.verify(thread.taskManager()).updateLags(); } @Test public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling() { - final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); - final StreamThread streamThread = setUpThread(streamsConfigProps); - streamThread.setState(State.STARTING); - streamThread.setState(State.PARTITIONS_ASSIGNED); + final Properties streamsConfigProps = configProps(false); + thread = setUpThread(streamsConfigProps); + thread.setState(State.STARTING); + thread.setState(State.PARTITIONS_ASSIGNED); - streamThread.runOnce(); + runOnce(); - final InOrder inOrder = Mockito.inOrder(streamThread.taskManager(), mainConsumer); - inOrder.verify(streamThread.taskManager()).resumePollingForPartitionsWithAvailableSpace(); + final InOrder inOrder = Mockito.inOrder(thread.taskManager(), mainConsumer); + inOrder.verify(thread.taskManager()).resumePollingForPartitionsWithAvailableSpace(); inOrder.verify(mainConsumer).poll(Mockito.any()); } @Test public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() { - final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); - streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true); + Assume.assumeTrue(stateUpdaterEnabled); + final Properties streamsConfigProps = configProps(false); + final StreamsConfig config = new StreamsConfig(streamsConfigProps); final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); - final StreamThread streamThread = setUpThread(streamsConfigProps); - streamThread.setState(State.STARTING); - streamThread.setState(State.PARTITIONS_ASSIGNED); + thread = setUpThread(streamsConfigProps); + thread.setState(State.STARTING); + thread.setState(State.PARTITIONS_ASSIGNED); - streamThread.runOnce(); + runOnce(); Mockito.verify(mainConsumer).poll(pollTime); } @Test public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() { - final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); - streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false); - final StreamThread streamThread = setUpThread(streamsConfigProps); - streamThread.setState(State.STARTING); - streamThread.setState(State.PARTITIONS_ASSIGNED); + Assume.assumeFalse(stateUpdaterEnabled); + final Properties streamsConfigProps = configProps(false); + thread = setUpThread(streamsConfigProps); + thread.setState(State.STARTING); + thread.setState(State.PARTITIONS_ASSIGNED); - streamThread.runOnce(); + runOnce(); Mockito.verify(mainConsumer).poll(Duration.ZERO); } private StreamThread setUpThread(final Properties streamsConfigProps) { + final StreamsConfig config = new StreamsConfig(streamsConfigProps); final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); @@ -3381,7 +3523,8 @@ public class StreamThreadTest { return taskManager; } - private void setupInternalTopologyWithoutState() { + private void setupInternalTopologyWithoutState(final StreamsConfig config) { + stateDirectory = new StateDirectory(config, mockTime, true, false); internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalTopologyBuilder.addProcessor( "processor1", @@ -3392,7 +3535,7 @@ public class StreamThreadTest { } // TODO: change return type to `StandbyTask` - private Collection createStandbyTask() { + private Collection createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); final Logger log = logContext.logger(StreamThreadTest.class); final StreamsMetricsImpl streamsMetrics = @@ -3478,4 +3621,33 @@ public class StreamThreadTest { null ); } + + private void runOnce() { + if (processingThreadsEnabled) { + thread.runOnceWithProcessingThreads(); + } else { + thread.runOnceWithoutProcessingThreads(); + } + } + + private void runUntilTimeoutOrException(final Runnable action) { + final long expectedEnd = System.currentTimeMillis() + DEFAULT_MAX_WAIT_MS; + while (System.currentTimeMillis() < expectedEnd) { + action.run(); + mockTime.sleep(10); + } + } + + private boolean runUntilTimeoutOrCondition(final Runnable action, final TestCondition testCondition) throws Exception { + final long expectedEnd = System.currentTimeMillis() + DEFAULT_MAX_WAIT_MS; + while (System.currentTimeMillis() < expectedEnd) { + action.run(); + if (testCondition.conditionMet()) { + return true; + } + mockTime.sleep(10); + } + return false; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java new file mode 100644 index 00000000000..ed9f909b3ca --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroupTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.internals.AbstractPartitionGroup.RecordInfo; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; + +public class SynchronizedPartitionGroupTest { + + @Mock + private AbstractPartitionGroup wrapped; + + private SynchronizedPartitionGroup synchronizedPartitionGroup; + + private AutoCloseable closeable; + + @BeforeEach + public void setUp() { + closeable = MockitoAnnotations.openMocks(this); + synchronizedPartitionGroup = new SynchronizedPartitionGroup(wrapped); + } + + @AfterEach + public void tearDown() throws Exception { + closeable.close(); + } + + @Test + public void testReadyToProcess() { + final long wallClockTime = 0L; + when(wrapped.readyToProcess(wallClockTime)).thenReturn(true); + + synchronizedPartitionGroup.readyToProcess(wallClockTime); + + verify(wrapped, times(1)).readyToProcess(wallClockTime); + } + + @Test + public void testUpdatePartitions() { + final Set inputPartitions = Collections.singleton(new TopicPartition("topic", 0)); + @SuppressWarnings("unchecked") final Function recordQueueCreator = (Function) mock(Function.class); + + synchronizedPartitionGroup.updatePartitions(inputPartitions, recordQueueCreator); + + verify(wrapped, times(1)).updatePartitions(inputPartitions, recordQueueCreator); + } + + @Test + public void testSetPartitionTime() { + final TopicPartition partition = new TopicPartition("topic", 0); + final long partitionTime = 12345678L; + + synchronizedPartitionGroup.setPartitionTime(partition, partitionTime); + + verify(wrapped, times(1)).setPartitionTime(partition, partitionTime); + } + + @Test + public void testNextRecord() { + final RecordInfo info = mock(RecordInfo.class); + final long wallClockTime = 12345678L; + final StampedRecord stampedRecord = mock(StampedRecord.class); + when(wrapped.nextRecord(info, wallClockTime)).thenReturn(stampedRecord); + + final StampedRecord result = synchronizedPartitionGroup.nextRecord(info, wallClockTime); + + assertEquals(stampedRecord, result); + verify(wrapped, times(1)).nextRecord(info, wallClockTime); + } + + @Test + public void testAddRawRecords() { + final TopicPartition partition = new TopicPartition("topic", 0); + @SuppressWarnings("unchecked") final Iterable> rawRecords = (Iterable>) mock(Iterable.class); + when(wrapped.addRawRecords(partition, rawRecords)).thenReturn(1); + + final int result = synchronizedPartitionGroup.addRawRecords(partition, rawRecords); + + assertEquals(1, result); + verify(wrapped, times(1)).addRawRecords(partition, rawRecords); + } + + @Test + public void testPartitionTimestamp() { + final TopicPartition partition = new TopicPartition("topic", 0); + final long timestamp = 12345678L; + when(wrapped.partitionTimestamp(partition)).thenReturn(timestamp); + + final long result = synchronizedPartitionGroup.partitionTimestamp(partition); + + assertEquals(timestamp, result); + verify(wrapped, times(1)).partitionTimestamp(partition); + } + + @Test + public void testStreamTime() { + final long streamTime = 12345678L; + when(wrapped.streamTime()).thenReturn(streamTime); + + final long result = synchronizedPartitionGroup.streamTime(); + + assertEquals(streamTime, result); + verify(wrapped, times(1)).streamTime(); + } + + @Test + public void testHeadRecordOffset() { + final TopicPartition partition = new TopicPartition("topic", 0); + final Long recordOffset = 0L; + when(wrapped.headRecordOffset(partition)).thenReturn(recordOffset); + + final Long result = synchronizedPartitionGroup.headRecordOffset(partition); + + assertEquals(recordOffset, result); + verify(wrapped, times(1)).headRecordOffset(partition); + } + + @Test + public void testNumBuffered() { + final int numBuffered = 1; + when(wrapped.numBuffered()).thenReturn(numBuffered); + + final int result = synchronizedPartitionGroup.numBuffered(); + + assertEquals(numBuffered, result); + verify(wrapped, times(1)).numBuffered(); + } + + @Test + public void testNumBufferedWithTopicPartition() { + final TopicPartition partition = new TopicPartition("topic", 0); + final int numBuffered = 1; + when(wrapped.numBuffered(partition)).thenReturn(numBuffered); + + final int result = synchronizedPartitionGroup.numBuffered(partition); + + assertEquals(numBuffered, result); + verify(wrapped, times(1)).numBuffered(partition); + } + + @Test + public void testClear() { + synchronizedPartitionGroup.clear(); + + verify(wrapped, times(1)).clear(); + } + + @Test + public void testUpdateLags() { + synchronizedPartitionGroup.updateLags(); + + verify(wrapped, times(1)).updateLags(); + } + + @Test + public void testClose() { + synchronizedPartitionGroup.close(); + + verify(wrapped, times(1)).close(); + } + + @Test + public void testPartitions() { + final Set partitions = Collections.singleton(new TopicPartition("topic", 0)); + when(wrapped.partitions()).thenReturn(partitions); + + final Set result = synchronizedPartitionGroup.partitions(); + + assertEquals(partitions, result); + verify(wrapped, times(1)).partitions(); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index add26097a65..1ec91b43164 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; @@ -48,6 +49,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTasks; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -199,6 +201,7 @@ public class TaskManagerTest { @org.mockito.Mock private Admin adminClient; final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class); + final DefaultTaskManager schedulingTaskManager = Mockito.mock(DefaultTaskManager.class); private TaskManager taskManager; private TopologyMetadata topologyMetadata; @@ -212,16 +215,21 @@ public class TaskManagerTest { @Before public void setUp() { - taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); } private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) { - return setUpTaskManager(processingMode, null, stateUpdaterEnabled); + return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false); + } + + private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks, final boolean stateUpdaterEnabled) { + return setUpTaskManager(processingMode, tasks, stateUpdaterEnabled, false); } private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks, - final boolean stateUpdaterEnabled) { + final boolean stateUpdaterEnabled, + final boolean processingThreadsEnabled) { topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode)); final TaskManager taskManager = new TaskManager( time, @@ -234,7 +242,8 @@ public class TaskManagerTest { topologyMetadata, adminClient, stateDirectory, - stateUpdaterEnabled ? stateUpdater : null + stateUpdaterEnabled ? stateUpdater : null, + processingThreadsEnabled ? schedulingTaskManager : null ); taskManager.setMainConsumer(consumer); return taskManager; @@ -287,6 +296,103 @@ public class TaskManagerTest { Mockito.verify(standbyTask).resume(); } + @Test + public void shouldLockAllTasksOnCorruptionWithProcessingThreads() { + final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + when(tasks.activeTaskIds()).thenReturn(mkSet(taskId00, taskId01)); + when(tasks.task(taskId00)).thenReturn(activeTask1); + final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); + when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); + expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); + replay(consumer); + + taskManager.handleCorruption(mkSet(taskId00)); + + verify(consumer); + Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01)); + Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01)); + } + + @Test + public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() { + final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); + when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); + + taskManager.commit(mkSet(activeTask1, activeTask2)); + + Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01)); + Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01)); + } + + @Test + public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() { + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + when(tasks.allTaskIds()).thenReturn(mkSet(taskId00, taskId01)); + final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); + when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); + + taskManager.handleAssignment( + mkMap(mkEntry(taskId00, taskId00Partitions)), + mkMap(mkEntry(taskId01, taskId01Partitions)) + ); + + Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01)); + Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01)); + } + + @Test + public void shouldLockAffectedTasksOnHandleRevocation() { + final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2)); + final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); + when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); + + taskManager.handleRevocation(taskId01Partitions); + + Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01)); + Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01)); + } + + @Test + public void shouldLockTasksOnClose() { + final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2)); + final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); + when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); + + taskManager.closeAndCleanUpTasks(mkSet(activeTask1), mkSet(), false); + + Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00)); + Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00)); + } + @Test public void shouldResumePollingForPartitionsWithAvailableSpaceForAllActiveTasks() { final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions) @@ -3553,6 +3659,16 @@ public class TaskManagerTest { Mockito.verify(failedStatefulTask).closeDirty(); } + @Test + public void shouldShutdownSchedulingTaskManager() { + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + + taskManager.shutdown(true); + + Mockito.verify(schedulingTaskManager).shutdown(Duration.ofMillis(Long.MAX_VALUE)); + } + @Test public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() { final TasksRegistry tasks = mock(TasksRegistry.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 92a2c069195..8b0584c380d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -460,7 +460,10 @@ public class StreamThreadStateStoreProviderTest { new MockTime(), stateManager, recordCollector, - context, logContext); + context, + logContext, + false + ); } private void mockThread(final boolean initialized) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index f1ac8249d23..20abaa54072 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -522,7 +522,9 @@ public class TopologyTestDriver implements Closeable { stateManager, recordCollector, context, - logContext); + logContext, + false + ); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); task.processorContext().setRecordContext(null);