From 6d649f503a964ed9612e79ef2d9e55e26240fbc3 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 Mar 2019 07:12:49 -0700 Subject: [PATCH] KAFKA-8062: Do not remore StateListener when shutting down stream thread (#6468) In a previous commit #6091, we've fixed a couple of edge cases and hence do not need to remove state listener anymore (before that we removed the state listener intentionally to avoid some race conditions, which has been gone for now). Reviewers: Matthias J. Sax , Bill Bejeck --- .../apache/kafka/streams/KafkaStreams.java | 4 +- .../processor/internals/StreamThread.java | 1 - .../kafka/streams/KafkaStreamsTest.java | 72 +++++++++++++++++-- 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4f15deadbc7..315a6bbaa92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable { * the instance will be in the ERROR state. The user will need to close it. */ public enum State { - CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5); + CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3); private final Set validTransitions = new HashSet<>(); @@ -857,7 +857,6 @@ public class KafkaStreams implements AutoCloseable { // notify all the threads to stop; avoid deadlocks by stopping any // further state reports from the thread since we're shutting down for (final StreamThread thread : threads) { - thread.setStateListener(null); thread.shutdown(); } @@ -872,7 +871,6 @@ public class KafkaStreams implements AutoCloseable { } if (globalStreamThread != null) { - globalStreamThread.setStateListener(null); globalStreamThread.shutdown(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 71df0f963da..1bd09e468f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -269,7 +269,6 @@ public class StreamThread extends Thread { if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get()); streamThread.shutdown(); - streamThread.setStateListener(null); return; } final long start = time.milliseconds(); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 6b8b5b529d1..3e55f2922cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -140,14 +140,14 @@ public class KafkaStreamsTest { } @Test - public void testStateCloseAfterCreate() { + public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() { globalStreams.close(); Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); } @Test - public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException { + public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException { final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); @@ -171,7 +171,7 @@ public class KafkaStreamsTest { Assert.assertEquals(3, stateListener.numChanges); Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - for (final StreamThread thread: globalStreams.threads) { + for (final StreamThread thread : globalStreams.threads) { thread.stateListener().onChange( thread, StreamThread.State.PARTITIONS_ASSIGNED, @@ -194,7 +194,7 @@ public class KafkaStreamsTest { Assert.assertEquals(3, stateListener.numChanges); Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - for (final StreamThread thread: globalStreams.threads) { + for (final StreamThread thread : globalStreams.threads) { if (thread != globalStreams.threads[NUM_THREADS - 1]) { thread.stateListener().onChange( thread, @@ -214,6 +214,70 @@ public class KafkaStreamsTest { Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); } + @Test + public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException { + final StateListenerStub stateListener = new StateListenerStub(); + globalStreams.setStateListener(stateListener); + + Assert.assertEquals(0, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); + + globalStreams.start(); + + TestUtils.waitForCondition( + () -> stateListener.numChanges == 2, + "Streams never started."); + Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); + + for (final StreamThread thread : globalStreams.threads) { + thread.stateListener().onChange( + thread, + StreamThread.State.PARTITIONS_REVOKED, + StreamThread.State.RUNNING); + } + + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + + globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( + globalStreams.threads[NUM_THREADS - 1], + StreamThread.State.PENDING_SHUTDOWN, + StreamThread.State.PARTITIONS_REVOKED); + + globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( + globalStreams.threads[NUM_THREADS - 1], + StreamThread.State.DEAD, + StreamThread.State.PENDING_SHUTDOWN); + + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + + for (final StreamThread thread : globalStreams.threads) { + if (thread != globalStreams.threads[NUM_THREADS - 1]) { + thread.stateListener().onChange( + thread, + StreamThread.State.PENDING_SHUTDOWN, + StreamThread.State.PARTITIONS_REVOKED); + + thread.stateListener().onChange( + thread, + StreamThread.State.DEAD, + StreamThread.State.PENDING_SHUTDOWN); + } + } + + Assert.assertEquals(4, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.ERROR, globalStreams.state()); + + globalStreams.close(); + + // the state should not stuck with ERROR, but transit to NOT_RUNNING in the end + TestUtils.waitForCondition( + () -> stateListener.numChanges == 6, + "Streams never closed."); + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + } + @Test public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception { builder.globalTable("anyTopic");