Browse Source

KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>, John Roesler <john@confluent.io>
pull/5970/merge
Boyang Chen 5 years ago committed by Matthias J. Sax
parent
commit
3e48bdbc33
  1. 34
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 81
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

34
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -190,14 +190,20 @@ public class StreamThread extends Thread { @@ -190,14 +190,20 @@ public class StreamThread extends Thread {
oldState = state;
if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " +
"only DEAD state is a valid next state", newState);
// when the state is already in PENDING_SHUTDOWN, all other transitions will be
// refused but we do not throw exception here
return null;
} else if (state == State.DEAD) {
log.debug("Ignoring request to transit from DEAD to {}: " +
"no valid next state after DEAD", newState);
// when the state is already in NOT_RUNNING, all its transitions
// will be refused but we do not throw exception here
return null;
} else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
log.debug("Ignoring request to transit from PARTITIONS_REVOKED to PARTITIONS_REVOKED: " +
"self transition is not allowed");
// when the state is already in PARTITIONS_REVOKED, its transition to itself will be
// refused but we do not throw exception here
return null;
@ -268,17 +274,23 @@ public class StreamThread extends Thread { @@ -268,17 +274,23 @@ public class StreamThread extends Thread {
final long start = time.milliseconds();
try {
if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
return;
}
if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) {
log.debug(
"Skipping task creation in rebalance because we are already in {} state.",
streamThread.state()
);
} else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) {
log.debug(
"Encountered assignment error during partition assignment: {}. Skipping task initialization",
streamThread.assignmentErrorCode
);
} else {
log.debug("Creating tasks based on assignment.");
taskManager.createTasks(assignment);
}
} catch (final Throwable t) {
log.error(
"Error caught during partition assignment, " +
"will abort the current process and re-throw at the end of rebalance: {}",
t
);
"will abort the current process and re-throw at the end of rebalance", t);
streamThread.setRebalanceException(t);
} finally {
log.info("partition assignment took {} ms.\n" +
@ -809,7 +821,6 @@ public class StreamThread extends Thread { @@ -809,7 +821,6 @@ public class StreamThread extends Thread {
// Visible for testing
void runOnce() {
final ConsumerRecords<byte[], byte[]> records;
now = time.milliseconds();
if (state == State.PARTITIONS_ASSIGNED) {
@ -830,6 +841,15 @@ public class StreamThread extends Thread { @@ -830,6 +841,15 @@ public class StreamThread extends Thread {
throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
}
// Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
// Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
// could affect the task manager state beyond this point within #runOnce().
if (!isRunning()) {
log.debug("State already transits to {}, skipping the run once call after poll request", state);
return;
}
final long pollLatency = advanceNowAndComputeLatency();
if (records != null && !records.isEmpty()) {

81
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@ -78,6 +79,7 @@ import org.slf4j.Logger; @@ -78,6 +79,7 @@ import org.slf4j.Logger;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -701,6 +703,85 @@ public class StreamThreadTest { @@ -701,6 +703,85 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
@Test
public void shouldNotThrowWhenPendingShutdownInRunOnce() {
mockRunOnce(true);
}
@Test
public void shouldNotThrowWithoutPendingShutdownInRunOnce() {
// A reference test to verify that without intermediate shutdown the runOnce should pass
// without any exception.
mockRunOnce(false);
}
private void mockRunOnce(final boolean shutdownOnPoll) {
final Collection<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
class MockStreamThreadConsumer<K, V> extends MockConsumer<K, V> {
private StreamThread streamThread;
private MockStreamThreadConsumer(final OffsetResetStrategy offsetResetStrategy) {
super(offsetResetStrategy);
}
@Override
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
assertNotNull(streamThread);
if (shutdownOnPoll) {
streamThread.shutdown();
}
streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
return super.poll(timeout);
}
private void setStreamThread(final StreamThread streamThread) {
this.streamThread = streamThread;
}
}
final MockStreamThreadConsumer<byte[], byte[]> mockStreamThreadConsumer =
new MockStreamThreadConsumer<>(OffsetResetStrategy.EARLIEST);
final TaskManager taskManager = new TaskManager(new MockChangelogReader(),
processId,
"log-prefix",
mockStreamThreadConsumer,
streamsMetadataState,
null,
null,
null,
new AssignedStreamsTasks(new LogContext()),
new AssignedStandbyTasks(new LogContext()));
taskManager.setConsumer(mockStreamThreadConsumer);
taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
final StreamThread thread = new StreamThread(
mockTime,
config,
null,
mockStreamThreadConsumer,
mockStreamThreadConsumer,
null,
taskManager,
streamsMetrics,
internalTopologyBuilder,
clientId,
new LogContext(""),
new AtomicInteger()
).updateThreadMetadata(getSharedAdminClientId(clientId));
mockStreamThreadConsumer.setStreamThread(thread);
mockStreamThreadConsumer.assign(assignedPartitions);
mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
addRecord(mockStreamThreadConsumer, 1L, 0L);
thread.setState(StreamThread.State.STARTING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
thread.runOnce();
}
@Test
public void shouldOnlyShutdownOnce() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);

Loading…
Cancel
Save