diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f9a4c24e1fa..ea1e29d0bcb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -190,14 +190,20 @@ public class StreamThread extends Thread { oldState = state; if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) { + log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " + + "only DEAD state is a valid next state", newState); // when the state is already in PENDING_SHUTDOWN, all other transitions will be // refused but we do not throw exception here return null; } else if (state == State.DEAD) { + log.debug("Ignoring request to transit from DEAD to {}: " + + "no valid next state after DEAD", newState); // when the state is already in NOT_RUNNING, all its transitions // will be refused but we do not throw exception here return null; } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) { + log.debug("Ignoring request to transit from PARTITIONS_REVOKED to PARTITIONS_REVOKED: " + + "self transition is not allowed"); // when the state is already in PARTITIONS_REVOKED, its transition to itself will be // refused but we do not throw exception here return null; @@ -268,17 +274,23 @@ public class StreamThread extends Thread { final long start = time.milliseconds(); try { if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) { - return; - } - if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) { + log.debug( + "Skipping task creation in rebalance because we are already in {} state.", + streamThread.state() + ); + } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) { + log.debug( + "Encountered assignment error during partition assignment: {}. Skipping task initialization", + streamThread.assignmentErrorCode + ); + } else { + log.debug("Creating tasks based on assignment."); taskManager.createTasks(assignment); } } catch (final Throwable t) { log.error( "Error caught during partition assignment, " + - "will abort the current process and re-throw at the end of rebalance: {}", - t - ); + "will abort the current process and re-throw at the end of rebalance", t); streamThread.setRebalanceException(t); } finally { log.info("partition assignment took {} ms.\n" + @@ -809,7 +821,6 @@ public class StreamThread extends Thread { // Visible for testing void runOnce() { final ConsumerRecords records; - now = time.milliseconds(); if (state == State.PARTITIONS_ASSIGNED) { @@ -830,6 +841,15 @@ public class StreamThread extends Thread { throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); } + // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). + // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). + // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation + // could affect the task manager state beyond this point within #runOnce(). + if (!isRunning()) { + log.debug("State already transits to {}, skipping the run once call after poll request", state); + return; + } + final long pollLatency = advanceNowAndComputeLatency(); if (records != null && !records.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7be1a715e3f..aff5d6c7486 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -78,6 +79,7 @@ import org.slf4j.Logger; import java.io.File; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -701,6 +703,85 @@ public class StreamThreadTest { EasyMock.verify(taskManager); } + @Test + public void shouldNotThrowWhenPendingShutdownInRunOnce() { + mockRunOnce(true); + } + + @Test + public void shouldNotThrowWithoutPendingShutdownInRunOnce() { + // A reference test to verify that without intermediate shutdown the runOnce should pass + // without any exception. + mockRunOnce(false); + } + + private void mockRunOnce(final boolean shutdownOnPoll) { + final Collection assignedPartitions = Collections.singletonList(t1p1); + class MockStreamThreadConsumer extends MockConsumer { + + private StreamThread streamThread; + + private MockStreamThreadConsumer(final OffsetResetStrategy offsetResetStrategy) { + super(offsetResetStrategy); + } + + @Override + public synchronized ConsumerRecords poll(final Duration timeout) { + assertNotNull(streamThread); + if (shutdownOnPoll) { + streamThread.shutdown(); + } + streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + return super.poll(timeout); + } + + private void setStreamThread(final StreamThread streamThread) { + this.streamThread = streamThread; + } + } + + final MockStreamThreadConsumer mockStreamThreadConsumer = + new MockStreamThreadConsumer<>(OffsetResetStrategy.EARLIEST); + + final TaskManager taskManager = new TaskManager(new MockChangelogReader(), + processId, + "log-prefix", + mockStreamThreadConsumer, + streamsMetadataState, + null, + null, + null, + new AssignedStreamsTasks(new LogContext()), + new AssignedStandbyTasks(new LogContext())); + taskManager.setConsumer(mockStreamThreadConsumer); + taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap()); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + mockStreamThreadConsumer, + mockStreamThreadConsumer, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext(""), + new AtomicInteger() + ).updateThreadMetadata(getSharedAdminClientId(clientId)); + + mockStreamThreadConsumer.setStreamThread(thread); + mockStreamThreadConsumer.assign(assignedPartitions); + mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + + addRecord(mockStreamThreadConsumer, 1L, 0L); + thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.PARTITIONS_REVOKED); + thread.runOnce(); + } + @Test public void shouldOnlyShutdownOnce() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class);