diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4f15deadbc7..315a6bbaa92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable { * the instance will be in the ERROR state. The user will need to close it. */ public enum State { - CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5); + CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3); private final Set validTransitions = new HashSet<>(); @@ -857,7 +857,6 @@ public class KafkaStreams implements AutoCloseable { // notify all the threads to stop; avoid deadlocks by stopping any // further state reports from the thread since we're shutting down for (final StreamThread thread : threads) { - thread.setStateListener(null); thread.shutdown(); } @@ -872,7 +871,6 @@ public class KafkaStreams implements AutoCloseable { } if (globalStreamThread != null) { - globalStreamThread.setStateListener(null); globalStreamThread.shutdown(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 71df0f963da..1bd09e468f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -269,7 +269,6 @@ public class StreamThread extends Thread { if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get()); streamThread.shutdown(); - streamThread.setStateListener(null); return; } final long start = time.milliseconds(); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 6b8b5b529d1..3e55f2922cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -140,14 +140,14 @@ public class KafkaStreamsTest { } @Test - public void testStateCloseAfterCreate() { + public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() { globalStreams.close(); Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); } @Test - public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException { + public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException { final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); @@ -171,7 +171,7 @@ public class KafkaStreamsTest { Assert.assertEquals(3, stateListener.numChanges); Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - for (final StreamThread thread: globalStreams.threads) { + for (final StreamThread thread : globalStreams.threads) { thread.stateListener().onChange( thread, StreamThread.State.PARTITIONS_ASSIGNED, @@ -194,7 +194,7 @@ public class KafkaStreamsTest { Assert.assertEquals(3, stateListener.numChanges); Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - for (final StreamThread thread: globalStreams.threads) { + for (final StreamThread thread : globalStreams.threads) { if (thread != globalStreams.threads[NUM_THREADS - 1]) { thread.stateListener().onChange( thread, @@ -214,6 +214,70 @@ public class KafkaStreamsTest { Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); } + @Test + public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException { + final StateListenerStub stateListener = new StateListenerStub(); + globalStreams.setStateListener(stateListener); + + Assert.assertEquals(0, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); + + globalStreams.start(); + + TestUtils.waitForCondition( + () -> stateListener.numChanges == 2, + "Streams never started."); + Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); + + for (final StreamThread thread : globalStreams.threads) { + thread.stateListener().onChange( + thread, + StreamThread.State.PARTITIONS_REVOKED, + StreamThread.State.RUNNING); + } + + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + + globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( + globalStreams.threads[NUM_THREADS - 1], + StreamThread.State.PENDING_SHUTDOWN, + StreamThread.State.PARTITIONS_REVOKED); + + globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( + globalStreams.threads[NUM_THREADS - 1], + StreamThread.State.DEAD, + StreamThread.State.PENDING_SHUTDOWN); + + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + + for (final StreamThread thread : globalStreams.threads) { + if (thread != globalStreams.threads[NUM_THREADS - 1]) { + thread.stateListener().onChange( + thread, + StreamThread.State.PENDING_SHUTDOWN, + StreamThread.State.PARTITIONS_REVOKED); + + thread.stateListener().onChange( + thread, + StreamThread.State.DEAD, + StreamThread.State.PENDING_SHUTDOWN); + } + } + + Assert.assertEquals(4, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.ERROR, globalStreams.state()); + + globalStreams.close(); + + // the state should not stuck with ERROR, but transit to NOT_RUNNING in the end + TestUtils.waitForCondition( + () -> stateListener.numChanges == 6, + "Streams never closed."); + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + } + @Test public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception { builder.globalTable("anyTopic");