Browse Source

MINOR: Add ability to wait for all instances in an application to be RUNNING (#7500)

Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/7546/head
Chris Pettitt 5 years ago committed by Matthias J. Sax
parent
commit
7a87a30f1f
  1. 68
      streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
  2. 50
      streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
  3. 89
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

68
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java

@ -16,26 +16,22 @@ @@ -16,26 +16,22 @@
*/
package org.apache.kafka.streams.integration;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.fail;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -46,7 +42,6 @@ import org.apache.kafka.common.serialization.Serdes; @@ -46,7 +42,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -76,9 +71,7 @@ public class OptimizedKTableIntegrationTest { @@ -76,9 +71,7 @@ public class OptimizedKTableIntegrationTest {
@Rule
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
private final Map<KafkaStreams, State> kafkaStreamsStates = new HashMap<>();
private final Lock kafkaStreamsStatesLock = new ReentrantLock();
private final Condition kafkaStreamsStateUpdate = kafkaStreamsStatesLock.newCondition();
private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = cluster.time;
@Before
@ -88,7 +81,7 @@ public class OptimizedKTableIntegrationTest { @@ -88,7 +81,7 @@ public class OptimizedKTableIntegrationTest {
@After
public void after() {
for (final KafkaStreams kafkaStreams : kafkaStreamsStates.keySet()) {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
}
}
@ -116,9 +109,8 @@ public class OptimizedKTableIntegrationTest { @@ -116,9 +109,8 @@ public class OptimizedKTableIntegrationTest {
final AtomicLong restoreStartOffset = new AtomicLong(-1);
kafkaStreamsList.forEach(kafkaStreams -> {
kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset, new AtomicLong()));
kafkaStreams.start();
});
waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, TimeUnit.SECONDS);
startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
// Assert that all messages in the first batch were processed in a timely manner
assertThat(semaphore.tryAcquire(numMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
@ -150,9 +142,8 @@ public class OptimizedKTableIntegrationTest { @@ -150,9 +142,8 @@ public class OptimizedKTableIntegrationTest {
final AtomicLong restoreEndOffset = new AtomicLong(-1L);
kafkaStreamsList.forEach(kafkaStreams -> {
kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset, restoreEndOffset));
kafkaStreams.start();
});
waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, TimeUnit.SECONDS);
startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
produceValueRange(key, 0, batch1NumMessages);
@ -226,49 +217,10 @@ public class OptimizedKTableIntegrationTest { @@ -226,49 +217,10 @@ public class OptimizedKTableIntegrationTest {
mockTime);
}
private void waitForKafkaStreamssToEnterRunningState(final Collection<KafkaStreams> kafkaStreamss,
final long time,
final TimeUnit timeUnit) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + timeUnit.toMillis(time);
kafkaStreamsStatesLock.lock();
try {
while (!kafkaStreamss.stream().allMatch(kafkaStreams -> kafkaStreamsStates.get(kafkaStreams) == State.RUNNING)) {
if (expectedEnd <= System.currentTimeMillis()) {
fail("one or more kafkaStreamss did not enter RUNNING in a timely manner");
}
final long millisRemaining = Math.max(1, expectedEnd - System.currentTimeMillis());
kafkaStreamsStateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS);
}
} finally {
kafkaStreamsStatesLock.unlock();
}
}
private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) {
final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(config), config);
kafkaStreamsStatesLock.lock();
try {
kafkaStreamsStates.put(kafkaStreams, kafkaStreams.state());
} finally {
kafkaStreamsStatesLock.unlock();
}
kafkaStreams.setStateListener((newState, oldState) -> {
kafkaStreamsStatesLock.lock();
try {
kafkaStreamsStates.put(kafkaStreams, newState);
if (newState == State.RUNNING) {
if (kafkaStreamsStates.values().stream().allMatch(state -> state == State.RUNNING)) {
kafkaStreamsStateUpdate.signalAll();
}
}
} finally {
kafkaStreamsStatesLock.unlock();
}
});
return kafkaStreams;
final KafkaStreams streams = new KafkaStreams(builder.build(config), config);
streamsToCleanup.add(streams);
return streams;
}
private StateRestoreListener createTrackingRestoreListener(final AtomicLong restoreStartOffset,

50
streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KafkaStreams.StateListener;
/**
* A {@link StateListener} that holds zero or more listeners internally and invokes all of them
* when a state transition occurs (i.e. {@link #onChange(State, State)} is called). If any listener
* throws {@link RuntimeException} or {@link Error} this immediately stops execution of listeners
* and causes the thrown exception to be raised.
*/
public class CompositeStateListener implements StateListener {
private final List<StateListener> listeners;
public CompositeStateListener(final StateListener... listeners) {
this(Arrays.asList(listeners));
}
public CompositeStateListener(final Collection<StateListener> stateListeners) {
this.listeners = Collections.unmodifiableList(new ArrayList<>(stateListeners));
}
@Override
public void onChange(final State newState, final State oldState) {
for (final StateListener listener : listeners) {
listener.onChange(newState, oldState);
}
}
}

89
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -16,6 +16,12 @@ @@ -16,6 +16,12 @@
*/
package org.apache.kafka.streams.integration.utils;
import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
@ -35,6 +41,8 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP @@ -35,6 +41,8 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
@ -70,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat; @@ -70,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
/**
* Utility functions to make integration testing more convenient.
@ -731,6 +740,86 @@ public class IntegrationTestUtils { @@ -731,6 +740,86 @@ public class IntegrationTestUtils {
});
}
/**
* Starts the given {@link KafkaStreams} instances and waits for all of them to reach the
* {@link State#RUNNING} state at the same time. Note that states may change between the time
* that this method returns and the calling function executes its next statement.
*
* @param streamsList the list of streams instances to run.
* @param timeout the time to wait for the streams to all be in @{link State#RUNNING} state.
*/
public static void startApplicationAndWaitUntilRunning(final List<KafkaStreams> streamsList,
final Duration timeout) throws InterruptedException {
final Lock stateLock = new ReentrantLock();
final Condition stateUpdate = stateLock.newCondition();
final Map<KafkaStreams, State> stateMap = new HashMap<>();
for (final KafkaStreams streams : streamsList) {
stateMap.put(streams, streams.state());
final StateListener prevStateListener = getStateListener(streams);
final StateListener newStateListener = (newState, oldState) -> {
stateLock.lock();
try {
stateMap.put(streams, newState);
if (newState == State.RUNNING) {
if (stateMap.values().stream().allMatch(state -> state == State.RUNNING)) {
stateUpdate.signalAll();
}
}
} finally {
stateLock.unlock();
}
};
streams.setStateListener(prevStateListener != null
? new CompositeStateListener(prevStateListener, newStateListener)
: newStateListener);
}
for (final KafkaStreams streams : streamsList) {
streams.start();
}
final long expectedEnd = System.currentTimeMillis() + timeout.toMillis();
stateLock.lock();
try {
// We use while true here because we want to run this test at least once, even if the
// timeout has expired
while (true) {
final Map<KafkaStreams, State> nonRunningStreams = new HashMap<>();
for (final Entry<KafkaStreams, State> entry : stateMap.entrySet()) {
if (entry.getValue() != State.RUNNING) {
nonRunningStreams.put(entry.getKey(), entry.getValue());
}
}
if (nonRunningStreams.isEmpty()) {
return;
}
final long millisRemaining = expectedEnd - System.currentTimeMillis();
if (millisRemaining <= 0) {
fail("Application did not reach a RUNNING state for all streams instances. Non-running instances: " +
nonRunningStreams);
}
stateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS);
}
} finally {
stateLock.unlock();
}
}
private static StateListener getStateListener(final KafkaStreams streams) {
try {
final Field field = streams.getClass().getDeclaredField("stateListener");
field.setAccessible(true);
return (StateListener) field.get(streams);
} catch (final IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException("Failed to get StateListener through reflection", e);
}
}
public static <K, V> void verifyKeyValueTimestamps(final Properties consumerConfig,
final String topic,
final List<KeyValueTimestamp<K, V>> expected) {

Loading…
Cancel
Save