Browse Source

MINOR: Fix deadlock between StreamThread and KafkaStreams

This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.

With current trunk there is a possibility to run into this:

```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

```

In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :

```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).

Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).

Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).

Author: Armin Braun <me@obrown.io>
Author: Armin <me@obrown.io>

Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2791 from original-brownbear/fix-streams-deadlock
pull/2735/merge
Armin Braun 8 years ago committed by Ismael Juma
parent
commit
3364f12bc2
  1. 2
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 18
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  3. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  4. 2
      streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

2
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -769,7 +769,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
// this will keep track of the first encountered exception // this will keep track of the first encountered exception
AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); AtomicReference<Throwable> firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread; boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeout > 0) { if (timeout > 0) {
if (invokedFromCallback) { if (invokedFromCallback) {

18
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -189,8 +189,12 @@ public class KafkaStreams {
return validTransitions.contains(newState.ordinal()); return validTransitions.contains(newState.ordinal());
} }
} }
private final Object stateLock = new Object();
private volatile State state = State.CREATED; private volatile State state = State.CREATED;
private StateListener stateListener = null;
private KafkaStreams.StateListener stateListener = null;
/** /**
@ -208,27 +212,27 @@ public class KafkaStreams {
} }
/** /**
* An app can set a single {@link StateListener} so that the app is notified when state changes. * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes.
* @param listener a new state listener * @param listener a new state listener
*/ */
public void setStateListener(final StateListener listener) { public void setStateListener(final KafkaStreams.StateListener listener) {
stateListener = listener; stateListener = listener;
} }
private synchronized void setState(final State newState) { private void setState(final State newState) {
synchronized (stateLock) {
final State oldState = state; final State oldState = state;
if (!state.isValidTransition(newState)) { if (!state.isValidTransition(newState)) {
log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
} else { } else {
log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
} }
state = newState; state = newState;
if (stateListener != null) { if (stateListener != null) {
stateListener.onChange(state, oldState); stateListener.onChange(state, oldState);
} }
} }
}
/** /**
* Return the current {@link State} of this {@code KafkaStreams} instance. * Return the current {@link State} of this {@code KafkaStreams} instance.
@ -248,7 +252,7 @@ public class KafkaStreams {
return Collections.unmodifiableMap(metrics.metrics()); return Collections.unmodifiableMap(metrics.metrics());
} }
private class StreamStateListener implements StreamThread.StateListener { private final class StreamStateListener implements StreamThread.StateListener {
@Override @Override
public synchronized void onChange(final StreamThread thread, public synchronized void onChange(final StreamThread thread,
final StreamThread.State newState, final StreamThread.State newState,

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -124,7 +124,7 @@ public class StreamThread extends Thread {
} }
private volatile State state = State.NOT_RUNNING; private volatile State state = State.NOT_RUNNING;
private StateListener stateListener = null; private StreamThread.StateListener stateListener = null;
/** /**
* Listen to state change events * Listen to state change events
@ -141,10 +141,10 @@ public class StreamThread extends Thread {
} }
/** /**
* Set the {@link StateListener} to be notified when state changes. Note this API is internal to * Set the {@link StreamThread.StateListener} to be notified when state changes. Note this API is internal to
* Kafka Streams and is not intended to be used by an external application. * Kafka Streams and is not intended to be used by an external application.
*/ */
public void setStateListener(final StateListener listener) { public void setStateListener(final StreamThread.StateListener listener) {
this.stateListener = listener; this.stateListener = listener;
} }
@ -463,7 +463,7 @@ public class StreamThread extends Thread {
action.apply(task); action.apply(task);
} catch (RuntimeException t) { } catch (RuntimeException t) {
log.error("{} Failed while executing {} {} due to {}: ", log.error("{} Failed while executing {} {} due to {}: ",
StreamThread.this.logPrefix, logPrefix,
task.getClass().getSimpleName(), task.getClass().getSimpleName(),
task.id(), task.id(),
exceptionMessage, exceptionMessage,

2
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

@ -282,7 +282,7 @@ public class KafkaStreamsTest {
try { try {
streams.cleanUp(); streams.cleanUp();
} catch (final IllegalStateException e) { } catch (final IllegalStateException e) {
Assert.assertEquals("Cannot clean up while running.", e.getMessage()); assertEquals("Cannot clean up while running.", e.getMessage());
throw e; throw e;
} finally { } finally {
streams.close(); streams.close();

Loading…
Cancel
Save