diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 767a5a9e0ff..de0ea393887 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -16,26 +16,22 @@ */ package org.apache.kafka.streams.integration; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.fail; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -46,7 +42,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -76,9 +71,7 @@ public class OptimizedKTableIntegrationTest { @Rule public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS); - private final Map kafkaStreamsStates = new HashMap<>(); - private final Lock kafkaStreamsStatesLock = new ReentrantLock(); - private final Condition kafkaStreamsStateUpdate = kafkaStreamsStatesLock.newCondition(); + private final List streamsToCleanup = new ArrayList<>(); private final MockTime mockTime = cluster.time; @Before @@ -88,7 +81,7 @@ public class OptimizedKTableIntegrationTest { @After public void after() { - for (final KafkaStreams kafkaStreams : kafkaStreamsStates.keySet()) { + for (final KafkaStreams kafkaStreams : streamsToCleanup) { kafkaStreams.close(); } } @@ -116,9 +109,8 @@ public class OptimizedKTableIntegrationTest { final AtomicLong restoreStartOffset = new AtomicLong(-1); kafkaStreamsList.forEach(kafkaStreams -> { kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset, new AtomicLong())); - kafkaStreams.start(); }); - waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, TimeUnit.SECONDS); + startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(numMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); @@ -150,9 +142,8 @@ public class OptimizedKTableIntegrationTest { final AtomicLong restoreEndOffset = new AtomicLong(-1L); kafkaStreamsList.forEach(kafkaStreams -> { kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset, restoreEndOffset)); - kafkaStreams.start(); }); - waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, TimeUnit.SECONDS); + startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); produceValueRange(key, 0, batch1NumMessages); @@ -226,49 +217,10 @@ public class OptimizedKTableIntegrationTest { mockTime); } - private void waitForKafkaStreamssToEnterRunningState(final Collection kafkaStreamss, - final long time, - final TimeUnit timeUnit) throws InterruptedException { - - final long expectedEnd = System.currentTimeMillis() + timeUnit.toMillis(time); - - kafkaStreamsStatesLock.lock(); - try { - while (!kafkaStreamss.stream().allMatch(kafkaStreams -> kafkaStreamsStates.get(kafkaStreams) == State.RUNNING)) { - if (expectedEnd <= System.currentTimeMillis()) { - fail("one or more kafkaStreamss did not enter RUNNING in a timely manner"); - } - final long millisRemaining = Math.max(1, expectedEnd - System.currentTimeMillis()); - kafkaStreamsStateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS); - } - } finally { - kafkaStreamsStatesLock.unlock(); - } - } - private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) { - final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(config), config); - kafkaStreamsStatesLock.lock(); - try { - kafkaStreamsStates.put(kafkaStreams, kafkaStreams.state()); - } finally { - kafkaStreamsStatesLock.unlock(); - } - - kafkaStreams.setStateListener((newState, oldState) -> { - kafkaStreamsStatesLock.lock(); - try { - kafkaStreamsStates.put(kafkaStreams, newState); - if (newState == State.RUNNING) { - if (kafkaStreamsStates.values().stream().allMatch(state -> state == State.RUNNING)) { - kafkaStreamsStateUpdate.signalAll(); - } - } - } finally { - kafkaStreamsStatesLock.unlock(); - } - }); - return kafkaStreams; + final KafkaStreams streams = new KafkaStreams(builder.build(config), config); + streamsToCleanup.add(streams); + return streams; } private StateRestoreListener createTrackingRestoreListener(final AtomicLong restoreStartOffset, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java new file mode 100644 index 00000000000..825894280e6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KafkaStreams.StateListener; + +/** + * A {@link StateListener} that holds zero or more listeners internally and invokes all of them + * when a state transition occurs (i.e. {@link #onChange(State, State)} is called). If any listener + * throws {@link RuntimeException} or {@link Error} this immediately stops execution of listeners + * and causes the thrown exception to be raised. + */ +public class CompositeStateListener implements StateListener { + private final List listeners; + + public CompositeStateListener(final StateListener... listeners) { + this(Arrays.asList(listeners)); + } + + public CompositeStateListener(final Collection stateListeners) { + this.listeners = Collections.unmodifiableList(new ArrayList<>(stateListeners)); + } + + @Override + public void onChange(final State newState, final State oldState) { + for (final StateListener listener : listeners) { + listener.onChange(newState, oldState); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 6c67a9df131..4921c4f39da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -16,6 +16,12 @@ */ package org.apache.kafka.streams.integration.utils; +import java.lang.reflect.Field; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import kafka.api.Request; import kafka.server.KafkaServer; import kafka.server.MetadataCache; @@ -35,6 +41,8 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; @@ -70,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; /** * Utility functions to make integration testing more convenient. @@ -731,6 +740,86 @@ public class IntegrationTestUtils { }); } + /** + * Starts the given {@link KafkaStreams} instances and waits for all of them to reach the + * {@link State#RUNNING} state at the same time. Note that states may change between the time + * that this method returns and the calling function executes its next statement. + * + * @param streamsList the list of streams instances to run. + * @param timeout the time to wait for the streams to all be in @{link State#RUNNING} state. + */ + public static void startApplicationAndWaitUntilRunning(final List streamsList, + final Duration timeout) throws InterruptedException { + final Lock stateLock = new ReentrantLock(); + final Condition stateUpdate = stateLock.newCondition(); + final Map stateMap = new HashMap<>(); + for (final KafkaStreams streams : streamsList) { + stateMap.put(streams, streams.state()); + final StateListener prevStateListener = getStateListener(streams); + final StateListener newStateListener = (newState, oldState) -> { + stateLock.lock(); + try { + stateMap.put(streams, newState); + if (newState == State.RUNNING) { + if (stateMap.values().stream().allMatch(state -> state == State.RUNNING)) { + stateUpdate.signalAll(); + } + } + } finally { + stateLock.unlock(); + } + }; + + streams.setStateListener(prevStateListener != null + ? new CompositeStateListener(prevStateListener, newStateListener) + : newStateListener); + } + + for (final KafkaStreams streams : streamsList) { + streams.start(); + } + + final long expectedEnd = System.currentTimeMillis() + timeout.toMillis(); + stateLock.lock(); + try { + // We use while true here because we want to run this test at least once, even if the + // timeout has expired + while (true) { + final Map nonRunningStreams = new HashMap<>(); + for (final Entry entry : stateMap.entrySet()) { + if (entry.getValue() != State.RUNNING) { + nonRunningStreams.put(entry.getKey(), entry.getValue()); + } + } + + if (nonRunningStreams.isEmpty()) { + return; + } + + final long millisRemaining = expectedEnd - System.currentTimeMillis(); + if (millisRemaining <= 0) { + fail("Application did not reach a RUNNING state for all streams instances. Non-running instances: " + + nonRunningStreams); + } + + stateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS); + } + } finally { + stateLock.unlock(); + } + } + + private static StateListener getStateListener(final KafkaStreams streams) { + try { + final Field field = streams.getClass().getDeclaredField("stateListener"); + field.setAccessible(true); + return (StateListener) field.get(streams); + } catch (final IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException("Failed to get StateListener through reflection", e); + } + } + + public static void verifyKeyValueTimestamps(final Properties consumerConfig, final String topic, final List> expected) {