From 3364f12bc240e3fefa6a467519ef608fa768917c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 3 Apr 2017 12:50:51 +0100 Subject: [PATCH] MINOR: Fix deadlock between StreamThread and KafkaStreams This may be a reason why we see Jenkins jobs time out at times. I can reproduce it locally. With current trunk there is a possibility to run into this: ```sh "kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345) - waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread) at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474) at java.lang.Thread.run(Thread.java:745) "appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219) - waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams) at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117) at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259) - locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener) at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168) - locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread) at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176) - locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread) at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326) ``` In a nutshell: `KafkaStreams` and `StreamThread` are both waiting for each other since another intermittent `close` (eg. from a test) comes along also trying to lock on `KafkaStreams` : ```sh "main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) - locked <0x000000077d45a590> (a java.lang.Thread) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503) - locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447) at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115) ``` => causing a deadlock. Fixed this by softer locking on the state change, that guarantees atomic changes to the state but does not lock on the whole object (I at least could not find another method that would require more than atomicly-locked access except for `setState`). Also qualified the state listeners with their outer-class to make the whole code-flow around this more readable (having two interfaces with the same naming for interface and method and then using them between their two outer classes is crazy hard to read imo :)). Easy to reproduced yourself by running `org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit (save yourself some time by running 2-4 in parallel :)). Eventually it will lock on one of the tests (for me this takes less than 1 min with 4 parallel runs). Author: Armin Braun Author: Armin Reviewers: Eno Thereska , Damian Guy , Ismael Juma Closes #2791 from original-brownbear/fix-streams-deadlock --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../apache/kafka/streams/KafkaStreams.java | 36 ++++++++++--------- .../processor/internals/StreamThread.java | 8 ++--- .../kafka/streams/KafkaStreamsTest.java | 2 +- 4 files changed, 26 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 9342791b616..8fe99ddec28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -769,7 +769,7 @@ public class KafkaProducer implements Producer { log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); // this will keep track of the first encountered exception - AtomicReference firstException = new AtomicReference(); + AtomicReference firstException = new AtomicReference<>(); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; if (timeout > 0) { if (invokedFromCallback) { diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2c116d92382..6ddf2a18400 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -189,8 +189,12 @@ public class KafkaStreams { return validTransitions.contains(newState.ordinal()); } } + + private final Object stateLock = new Object(); + private volatile State state = State.CREATED; - private StateListener stateListener = null; + + private KafkaStreams.StateListener stateListener = null; /** @@ -208,25 +212,25 @@ public class KafkaStreams { } /** - * An app can set a single {@link StateListener} so that the app is notified when state changes. + * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes. * @param listener a new state listener */ - public void setStateListener(final StateListener listener) { + public void setStateListener(final KafkaStreams.StateListener listener) { stateListener = listener; } - private synchronized void setState(final State newState) { - final State oldState = state; - if (!state.isValidTransition(newState)) { - log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); - } else { - log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); - } - - state = newState; - - if (stateListener != null) { - stateListener.onChange(state, oldState); + private void setState(final State newState) { + synchronized (stateLock) { + final State oldState = state; + if (!state.isValidTransition(newState)) { + log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); + } else { + log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); + } + state = newState; + if (stateListener != null) { + stateListener.onChange(state, oldState); + } } } @@ -248,7 +252,7 @@ public class KafkaStreams { return Collections.unmodifiableMap(metrics.metrics()); } - private class StreamStateListener implements StreamThread.StateListener { + private final class StreamStateListener implements StreamThread.StateListener { @Override public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 46704b9014a..9791a0a3bea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -124,7 +124,7 @@ public class StreamThread extends Thread { } private volatile State state = State.NOT_RUNNING; - private StateListener stateListener = null; + private StreamThread.StateListener stateListener = null; /** * Listen to state change events @@ -141,10 +141,10 @@ public class StreamThread extends Thread { } /** - * Set the {@link StateListener} to be notified when state changes. Note this API is internal to + * Set the {@link StreamThread.StateListener} to be notified when state changes. Note this API is internal to * Kafka Streams and is not intended to be used by an external application. */ - public void setStateListener(final StateListener listener) { + public void setStateListener(final StreamThread.StateListener listener) { this.stateListener = listener; } @@ -463,7 +463,7 @@ public class StreamThread extends Thread { action.apply(task); } catch (RuntimeException t) { log.error("{} Failed while executing {} {} due to {}: ", - StreamThread.this.logPrefix, + logPrefix, task.getClass().getSimpleName(), task.id(), exceptionMessage, diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index eebbde95943..efa484e9db2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -282,7 +282,7 @@ public class KafkaStreamsTest { try { streams.cleanUp(); } catch (final IllegalStateException e) { - Assert.assertEquals("Cannot clean up while running.", e.getMessage()); + assertEquals("Cannot clean up while running.", e.getMessage()); throw e; } finally { streams.close();