Browse Source

KAFKA-12195 Fix synchronization issue happening in KafkaStreams (#9887)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/9929/head
Chia-Ping Tsai 4 years ago committed by GitHub
parent
commit
6752f28254
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 88
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 38
      streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java

88
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -92,6 +92,7 @@ import java.util.concurrent.Executors; @@ -92,6 +92,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@ -374,9 +375,7 @@ public class KafkaStreams implements AutoCloseable { @@ -374,9 +375,7 @@ public class KafkaStreams implements AutoCloseable {
synchronized (stateLock) {
if (state == State.CREATED) {
oldHandler = true;
for (final StreamThread thread : threads) {
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
processStreamThread(thread -> thread.setUncaughtExceptionHandler(uncaughtExceptionHandler));
if (globalStreamThread != null) {
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
@ -409,9 +408,7 @@ public class KafkaStreams implements AutoCloseable { @@ -409,9 +408,7 @@ public class KafkaStreams implements AutoCloseable {
if (state == State.CREATED) {
this.streamsUncaughtExceptionHandler = handler;
Objects.requireNonNull(streamsUncaughtExceptionHandler);
for (final StreamThread thread : threads) {
thread.setStreamsUncaughtExceptionHandler(handler);
}
processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(handler));
if (globalStreamThread != null) {
globalStreamThread.setUncaughtExceptionHandler(handler);
}
@ -480,15 +477,14 @@ public class KafkaStreams implements AutoCloseable { @@ -480,15 +477,14 @@ public class KafkaStreams implements AutoCloseable {
"but the uncaught exception was an Error, which means this runtime is no " +
"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
}
if (Thread.currentThread().equals(globalStreamThread) && threads.stream().noneMatch(StreamThread::isRunning)) {
if (Thread.currentThread().equals(globalStreamThread) && countStreamThread(StreamThread::isRunning) == 0) {
log.error("Exception in global thread caused the application to attempt to shutdown." +
" This action will succeed only if there is at least one StreamThread running on this client." +
" Currently there are no running threads so will now close the client.");
close(Duration.ZERO);
} else {
for (final StreamThread streamThread : threads) {
streamThread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED);
}
processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
log.error("Encountered the following exception during processing " +
"and sent shutdown request for the entire application.", throwable);
}
@ -523,14 +519,14 @@ public class KafkaStreams implements AutoCloseable { @@ -523,14 +519,14 @@ public class KafkaStreams implements AutoCloseable {
public Map<MetricName, ? extends Metric> metrics() {
final Map<MetricName, Metric> result = new LinkedHashMap<>();
// producer and consumer clients are per-thread
for (final StreamThread thread : threads) {
processStreamThread(thread -> {
result.putAll(thread.producerMetrics());
result.putAll(thread.consumerMetrics());
// admin client is shared, so we can actually move it
// to result.putAll(adminClient.metrics()).
// we did it intentionally just for flexibility.
result.putAll(thread.adminClientMetrics());
}
});
// global thread's consumer client
if (globalStreamThread != null) {
result.putAll(globalStreamThread.consumerMetrics());
@ -899,7 +895,7 @@ public class KafkaStreams implements AutoCloseable { @@ -899,7 +895,7 @@ public class KafkaStreams implements AutoCloseable {
}
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
Math.toIntExact(countStreamThread(thread -> thread.state().isAlive())));
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
@ -951,12 +947,15 @@ public class KafkaStreams implements AutoCloseable { @@ -951,12 +947,15 @@ public class KafkaStreams implements AutoCloseable {
if (isRunningOrRebalancing()) {
final int threadIdx;
final long cacheSizePerThread;
final StreamThread streamThread;
synchronized (changeThreadCount) {
threadIdx = getNextThreadIndex();
cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
resizeThreadCache(cacheSizePerThread);
// Creating thread should hold the lock in order to avoid duplicate thread index.
// If the duplicate index happen, the metadata of thread may be duplicate too.
streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
}
final StreamThread streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
synchronized (stateLock) {
if (isRunningOrRebalancing()) {
@ -989,8 +988,10 @@ public class KafkaStreams implements AutoCloseable { @@ -989,8 +988,10 @@ public class KafkaStreams implements AutoCloseable {
public Optional<String> removeStreamThread() {
if (isRunningOrRebalancing()) {
synchronized (changeThreadCount) {
for (final StreamThread streamThread : threads) {
if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) {
// make a copy of threads to avoid holding lock
for (final StreamThread streamThread : new ArrayList<>(threads)) {
if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName())
|| threads.size() == 1)) {
streamThread.shutdown();
if (!streamThread.getName().equals(Thread.currentThread().getName())) {
streamThread.waitOnThreadState(StreamThread.State.DEAD);
@ -1011,9 +1012,7 @@ public class KafkaStreams implements AutoCloseable { @@ -1011,9 +1012,7 @@ public class KafkaStreams implements AutoCloseable {
private int getNextThreadIndex() {
final HashSet<String> names = new HashSet<>();
for (final StreamThread streamThread: threads) {
names.add(streamThread.getName());
}
processStreamThread(thread -> names.add(thread.getName()));
final String baseName = clientId + "-StreamThread-";
for (int i = 1; i <= threads.size(); i++) {
final String name = baseName + i;
@ -1032,9 +1031,7 @@ public class KafkaStreams implements AutoCloseable { @@ -1032,9 +1031,7 @@ public class KafkaStreams implements AutoCloseable {
}
private void resizeThreadCache(final long cacheSizePerThread) {
for (final StreamThread streamThread: threads) {
streamThread.resizeCache(cacheSizePerThread);
}
processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
if (globalStreamThread != null) {
globalStreamThread.resize(cacheSizePerThread);
}
@ -1103,9 +1100,7 @@ public class KafkaStreams implements AutoCloseable { @@ -1103,9 +1100,7 @@ public class KafkaStreams implements AutoCloseable {
globalStreamThread.start();
}
for (final StreamThread thread : threads) {
thread.start();
}
processStreamThread(StreamThread::start);
final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
stateDirCleaner.scheduleAtFixedRate(() -> {
@ -1178,11 +1173,9 @@ public class KafkaStreams implements AutoCloseable { @@ -1178,11 +1173,9 @@ public class KafkaStreams implements AutoCloseable {
return new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
for (final StreamThread thread : threads) {
thread.shutdown();
}
processStreamThread(StreamThread::shutdown);
for (final StreamThread thread : threads) {
processStreamThread(thread -> {
try {
if (!thread.isRunning()) {
thread.join();
@ -1190,7 +1183,7 @@ public class KafkaStreams implements AutoCloseable { @@ -1190,7 +1183,7 @@ public class KafkaStreams implements AutoCloseable {
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
if (globalStreamThread != null) {
globalStreamThread.shutdown();
@ -1469,6 +1462,31 @@ public class KafkaStreams implements AutoCloseable { @@ -1469,6 +1462,31 @@ public class KafkaStreams implements AutoCloseable {
return queryableStoreProvider.getStore(storeQueryParameters);
}
/**
* handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
* require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding
* threads lock when looping threads.
* @param consumer handler
*/
private void processStreamThread(final Consumer<StreamThread> consumer) {
final List<StreamThread> copy = new ArrayList<>(threads);
for (final StreamThread thread : copy) consumer.accept(thread);
}
/**
* count the snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
* require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding
* threads lock when looping threads.
* @param predicate predicate
* @return number of matched threads
*/
private long countStreamThread(final Predicate<StreamThread> predicate) {
final List<StreamThread> copy = new ArrayList<>(threads);
return copy.stream().filter(predicate).count();
}
/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
@ -1477,13 +1495,13 @@ public class KafkaStreams implements AutoCloseable { @@ -1477,13 +1495,13 @@ public class KafkaStreams implements AutoCloseable {
public Set<ThreadMetadata> localThreadsMetadata() {
validateIsRunningOrRebalancing();
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
for (final StreamThread thread : threads) {
processStreamThread(thread -> {
synchronized (thread.getStateLock()) {
if (thread.state() != StreamThread.State.DEAD) {
threadMetadata.add(thread.threadMetadata());
}
}
}
});
return threadMetadata;
}
@ -1504,13 +1522,13 @@ public class KafkaStreams implements AutoCloseable { @@ -1504,13 +1522,13 @@ public class KafkaStreams implements AutoCloseable {
final Map<TopicPartition, Long> allChangelogPositions = new HashMap<>();
// Obtain the current positions, of all the active-restoring and standby tasks
for (final StreamThread streamThread : threads) {
for (final Task task : streamThread.allTasks().values()) {
processStreamThread(thread -> {
for (final Task task : thread.allTasks().values()) {
allPartitions.addAll(task.changelogPartitions());
// Note that not all changelog partitions, will have positions; since some may not have started
allChangelogPositions.putAll(task.changelogOffsets());
}
}
});
log.debug("Current changelog positions: {}", allChangelogPositions);
final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets;

38
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java

@ -40,7 +40,10 @@ import java.util.List; @@ -40,7 +40,10 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -52,6 +55,9 @@ import static org.hamcrest.CoreMatchers.equalTo; @@ -52,6 +55,9 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Category(IntegrationTest.class)
public class AdjustStreamThreadCountTest {
@ -280,4 +286,36 @@ public class AdjustStreamThreadCountTest { @@ -280,4 +286,36 @@ public class AdjustStreamThreadCountTest {
waitForTransitionFromRebalancingToRunning();
}
}
@Test
public void testConcurrentlyAccessThreads() throws InterruptedException {
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
final int threadCount = 5;
final int loop = 3;
final AtomicReference<Throwable> lastException = new AtomicReference<>();
final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int threadIndex = 0; threadIndex < threadCount; ++threadIndex) {
executor.execute(() -> {
try {
for (int i = 0; i < loop + 1; i++) {
if (!kafkaStreams.addStreamThread().isPresent())
throw new RuntimeException("failed to create stream thread");
kafkaStreams.localThreadsMetadata();
if (!kafkaStreams.removeStreamThread().isPresent())
throw new RuntimeException("failed to delete a stream thread");
}
} catch (final Exception e) {
lastException.set(e);
}
});
}
executor.shutdown();
assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
assertNull(lastException.get());
assertEquals(oldThreadCount, kafkaStreams.localThreadsMetadata().size());
}
}
}

Loading…
Cancel
Save