Browse Source

KAFKA-8062: Do not remore StateListener when shutting down stream thread (#6468)

In a previous commit #6091, we've fixed a couple of edge cases and hence do not need to remove state listener anymore (before that we removed the state listener intentionally to avoid some race conditions, which has been gone for now).

Reviewers: Matthias J. Sax <mjsax@apache.org>,   Bill Bejeck <bbejeck@gmail.com>
pull/6479/head
Guozhang Wang 6 years ago committed by Bill Bejeck
parent
commit
6d649f503a
  1. 4
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  3. 72
      streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

4
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable { @@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable {
* the instance will be in the ERROR state. The user will need to close it.
*/
public enum State {
CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5);
CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
private final Set<Integer> validTransitions = new HashSet<>();
@ -857,7 +857,6 @@ public class KafkaStreams implements AutoCloseable { @@ -857,7 +857,6 @@ public class KafkaStreams implements AutoCloseable {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
for (final StreamThread thread : threads) {
thread.setStateListener(null);
thread.shutdown();
}
@ -872,7 +871,6 @@ public class KafkaStreams implements AutoCloseable { @@ -872,7 +871,6 @@ public class KafkaStreams implements AutoCloseable {
}
if (globalStreamThread != null) {
globalStreamThread.setStateListener(null);
globalStreamThread.shutdown();
}

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -269,7 +269,6 @@ public class StreamThread extends Thread { @@ -269,7 +269,6 @@ public class StreamThread extends Thread {
if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
streamThread.shutdown();
streamThread.setStateListener(null);
return;
}
final long start = time.milliseconds();

72
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

@ -140,14 +140,14 @@ public class KafkaStreamsTest { @@ -140,14 +140,14 @@ public class KafkaStreamsTest {
}
@Test
public void testStateCloseAfterCreate() {
public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() {
globalStreams.close();
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
}
@Test
public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException {
public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
final StateListenerStub stateListener = new StateListenerStub();
globalStreams.setStateListener(stateListener);
@ -171,7 +171,7 @@ public class KafkaStreamsTest { @@ -171,7 +171,7 @@ public class KafkaStreamsTest {
Assert.assertEquals(3, stateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
for (final StreamThread thread: globalStreams.threads) {
for (final StreamThread thread : globalStreams.threads) {
thread.stateListener().onChange(
thread,
StreamThread.State.PARTITIONS_ASSIGNED,
@ -194,7 +194,7 @@ public class KafkaStreamsTest { @@ -194,7 +194,7 @@ public class KafkaStreamsTest {
Assert.assertEquals(3, stateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
for (final StreamThread thread: globalStreams.threads) {
for (final StreamThread thread : globalStreams.threads) {
if (thread != globalStreams.threads[NUM_THREADS - 1]) {
thread.stateListener().onChange(
thread,
@ -214,6 +214,70 @@ public class KafkaStreamsTest { @@ -214,6 +214,70 @@ public class KafkaStreamsTest {
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
}
@Test
public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
final StateListenerStub stateListener = new StateListenerStub();
globalStreams.setStateListener(stateListener);
Assert.assertEquals(0, stateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
globalStreams.start();
TestUtils.waitForCondition(
() -> stateListener.numChanges == 2,
"Streams never started.");
Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
for (final StreamThread thread : globalStreams.threads) {
thread.stateListener().onChange(
thread,
StreamThread.State.PARTITIONS_REVOKED,
StreamThread.State.RUNNING);
}
Assert.assertEquals(3, stateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
globalStreams.threads[NUM_THREADS - 1],
StreamThread.State.PENDING_SHUTDOWN,
StreamThread.State.PARTITIONS_REVOKED);
globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
globalStreams.threads[NUM_THREADS - 1],
StreamThread.State.DEAD,
StreamThread.State.PENDING_SHUTDOWN);
Assert.assertEquals(3, stateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
for (final StreamThread thread : globalStreams.threads) {
if (thread != globalStreams.threads[NUM_THREADS - 1]) {
thread.stateListener().onChange(
thread,
StreamThread.State.PENDING_SHUTDOWN,
StreamThread.State.PARTITIONS_REVOKED);
thread.stateListener().onChange(
thread,
StreamThread.State.DEAD,
StreamThread.State.PENDING_SHUTDOWN);
}
}
Assert.assertEquals(4, stateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.ERROR, globalStreams.state());
globalStreams.close();
// the state should not stuck with ERROR, but transit to NOT_RUNNING in the end
TestUtils.waitForCondition(
() -> stateListener.numChanges == 6,
"Streams never closed.");
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
}
@Test
public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
builder.globalTable("anyTopic");

Loading…
Cancel
Save