Browse Source

MINOR: fix flaky QueryableStateIntegrationTest (#6458)

Reviewer: Bill Bejeck <bill@confluent.io>
0.11.0
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
54b26c0aa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 506
      streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

506
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

@ -38,10 +38,12 @@ import org.apache.kafka.streams.kstream.KGroupedStream; @@ -38,10 +38,12 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -58,7 +60,6 @@ import org.junit.ClassRule; @@ -58,7 +60,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@ -85,8 +86,7 @@ public class QueryableStateIntegrationTest { @@ -85,8 +86,7 @@ public class QueryableStateIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
@ -95,6 +95,7 @@ public class QueryableStateIntegrationTest { @@ -95,6 +95,7 @@ public class QueryableStateIntegrationTest {
private String streamConcurrent = "stream-concurrent";
private String outputTopic = "output";
private String outputTopicConcurrent = "output-concurrent";
private String outputTopicConcurrentWindowed = "output-concurrent-windowed";
private String outputTopicThree = "output-three";
// sufficiently large window size such that everything falls into 1 window
private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
@ -102,28 +103,30 @@ public class QueryableStateIntegrationTest { @@ -102,28 +103,30 @@ public class QueryableStateIntegrationTest {
private static final int NUM_REPLICAS = NUM_BROKERS;
private Properties streamsConfiguration;
private List<String> inputValues;
private int numberOfWordsPerIteration = 0;
private Set<String> inputValuesKeys;
private KafkaStreams kafkaStreams;
private Comparator<KeyValue<String, String>> stringComparator;
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;
private void createTopics() throws InterruptedException {
private void createTopics() throws Exception {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
outputTopic = outputTopic + "-" + testNo;
outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + testNo;
outputTopicThree = outputTopicThree + "-" + testNo;
streamTwo = streamTwo + "-" + testNo;
CLUSTER.createTopics(streamOne, streamConcurrent);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree);
CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree);
}
@Before
public void before() throws IOException, InterruptedException {
public void before() throws Exception {
testNo++;
createTopics();
streamsConfiguration = new Properties();
@ -139,7 +142,6 @@ public class QueryableStateIntegrationTest { @@ -139,7 +142,6 @@ public class QueryableStateIntegrationTest {
// override this to make the rebalances happen quickly
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
stringComparator = new Comparator<KeyValue<String, String>>() {
@Override
@ -176,24 +178,29 @@ public class QueryableStateIntegrationTest { @@ -176,24 +178,29 @@ public class QueryableStateIntegrationTest {
for (final String sentence : inputValues) {
final String[] words = sentence.split("\\W+");
for (final String word : words) {
numberOfWordsPerIteration += words.length;
inputValuesKeys.add(word);
}
}
}
@After
public void shutdown() throws IOException {
public void shutdown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close(30, TimeUnit.SECONDS);
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
/**
* Creates a typical word count topology
*/
private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
private KafkaStreams createCountStream(final String inputTopic,
final String outputTopic,
final String windowOutputTopic,
final String storeName,
final String windowStoreName,
final Properties streamsConfiguration) {
final KStreamBuilder builder = new KStreamBuilder();
final Serde<String> stringSerde = Serdes.String();
final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
@ -208,10 +215,20 @@ public class QueryableStateIntegrationTest { @@ -208,10 +215,20 @@ public class QueryableStateIntegrationTest {
.groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
// Create a State Store for the all time word count
groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);
groupedByWord
.count(storeName + "-" + inputTopic)
.to(Serdes.String(), Serdes.Long(), outputTopic);
// Create a Windowed State Store that contains the word count for every 1 minute
groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
groupedByWord
.count(TimeWindows.of(WINDOW_SIZE), windowStoreName + "-" + inputTopic)
.toStream(new KeyValueMapper<Windowed<String>, Long, String>() {
@Override
public String apply(final Windowed<String> key, final Long value) {
return key.key();
}
})
.to(Serdes.String(), Serdes.Long(), windowOutputTopic);
return new KafkaStreams(builder, streamsConfiguration);
}
@ -221,17 +238,21 @@ public class QueryableStateIntegrationTest { @@ -221,17 +238,21 @@ public class QueryableStateIntegrationTest {
private boolean closed = false;
private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort) {
StreamRunnable(final String inputTopic,
final String outputTopic,
final String outputTopicWindowed,
final String storeName,
final String windowStoreName,
final int queryPort) {
final Properties props = (Properties) streamsConfiguration.clone();
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
myStream = createCountStream(inputTopic, outputTopic, props);
myStream = createCountStream(inputTopic, outputTopic, outputTopicWindowed, storeName, windowStoreName, props);
myStream.setStateListener(stateListener);
}
@Override
public void run() {
myStream.start();
}
public void close() {
@ -254,69 +275,80 @@ public class QueryableStateIntegrationTest { @@ -254,69 +275,80 @@ public class QueryableStateIntegrationTest {
}
}
private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
private void verifyAllKVKeys(final StreamRunnable[] streamRunnables,
final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
final Set<String> keys, final String storeName) throws Exception {
final Set<String> keys,
final String storeName) throws Exception {
for (final String key : keys) {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
try {
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
try {
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
return store != null && store.get(key) != null;
} catch (final IllegalStateException e) {
// Kafka Streams instance may have closed but rebalance hasn't happened
return false;
} catch (final InvalidStateStoreException e) {
// there must have been at least one rebalance state
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
return store != null && store.get(key) != null;
} catch (final IllegalStateException e) {
// Kafka Streams instance may have closed but rebalance hasn't happened
return false;
} catch (final InvalidStateStoreException e) {
// there must have been at least one rebalance state
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
return false;
}
}
}, 120000, "waiting for metadata, store and value to be non null");
}
},
120000,
"waiting for metadata, store and value to be non null"
);
}
}
private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables,
final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
final Set<String> keys, final String storeName,
final Long from, final Long to) throws Exception {
final Set<String> keys,
final String storeName,
final Long from,
final Long to) throws Exception {
for (final String key : keys) {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
try {
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
try {
final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
return store != null && store.fetch(key, from, to) != null;
} catch (final IllegalStateException e) {
// Kafka Streams instance may have closed but rebalance hasn't happened
return false;
} catch (final InvalidStateStoreException e) {
// there must have been at least one rebalance state
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
return false;
}
final int index = metadata.hostInfo().port();
final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
return store != null && store.fetch(key, from, to) != null;
} catch (final IllegalStateException e) {
// Kafka Streams instance may have closed but rebalance hasn't happened
return false;
} catch (final InvalidStateStoreException e) {
// there must have been at least one rebalance state
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
return false;
}
}
}, 120000, "waiting for metadata, store and value to be non null");
}
},
120000,
"waiting for metadata, store and value to be non null"
);
}
}
@Test
public void queryOnRebalance() throws Exception {
final int numThreads = STREAM_TWO_PARTITIONS;
@ -326,10 +358,17 @@ public class QueryableStateIntegrationTest { @@ -326,10 +358,17 @@ public class QueryableStateIntegrationTest {
final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
producerRunnable.run();
// create stream threads
final String storeName = "word-count-store";
final String windowStoreName = "windowed-word-count-store";
for (int i = 0; i < numThreads; i++) {
streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i);
streamRunnables[i] = new StreamRunnable(
streamThree,
outputTopicThree,
outputTopicConcurrentWindowed,
storeName,
windowStoreName,
i);
streamThreads[i] = new Thread(streamRunnables[i]);
streamThreads[i].start();
}
@ -338,11 +377,21 @@ public class QueryableStateIntegrationTest { @@ -338,11 +377,21 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
for (int i = 0; i < numThreads; i++) {
verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
"word-count-store-" + streamThree);
verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
"windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
verifyAllKVKeys(
streamRunnables,
streamRunnables[i].getStream(),
streamRunnables[i].getStateListener(),
inputValuesKeys,
storeName + "-" + streamThree);
verifyAllWindowedKeys(
streamRunnables,
streamRunnables[i].getStream(),
streamRunnables[i].getStateListener(),
inputValuesKeys,
windowStoreName + "-" + streamThree,
0L,
WINDOW_SIZE);
assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
}
// kill N-1 threads
@ -353,11 +402,21 @@ public class QueryableStateIntegrationTest { @@ -353,11 +402,21 @@ public class QueryableStateIntegrationTest {
}
// query from the remaining thread
verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
"word-count-store-" + streamThree);
verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
"windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
verifyAllKVKeys(
streamRunnables,
streamRunnables[0].getStream(),
streamRunnables[0].getStateListener(),
inputValuesKeys,
storeName + "-" + streamThree);
verifyAllWindowedKeys(
streamRunnables,
streamRunnables[0].getStream(),
streamRunnables[0].getStateListener(),
inputValuesKeys,
windowStoreName + "-" + streamThree,
0L,
WINDOW_SIZE);
assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
} finally {
for (int i = 0; i < numThreads; i++) {
if (!streamRunnables[i].isClosed()) {
@ -371,35 +430,43 @@ public class QueryableStateIntegrationTest { @@ -371,35 +430,43 @@ public class QueryableStateIntegrationTest {
@Test
public void concurrentAccesses() throws Exception {
final int numIterations = 500000;
final String storeName = "word-count-store";
final String windowStoreName = "windowed-word-count-store";
final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
final Thread producerThread = new Thread(producerRunnable);
kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, streamsConfiguration);
kafkaStreams = createCountStream(
streamConcurrent,
outputTopicConcurrent,
outputTopicConcurrentWindowed,
storeName,
windowStoreName,
streamsConfiguration);
kafkaStreams.start();
producerThread.start();
try {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
final ReadOnlyKeyValueStore<String, Long>
keyValueStore = kafkaStreams.store("word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyKeyValueStore<String, Long> keyValueStore =
kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("windowed-word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
final Map<String, Long> expectedWindowState = new HashMap<>();
final Map<String, Long> expectedCount = new HashMap<>();
while (producerRunnable.getCurrIteration() < numIterations) {
verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
expectedCount, windowStore, keyValueStore, false);
verifyGreaterOrEqual(
inputValuesKeys.toArray(new String[inputValuesKeys.size()]),
expectedWindowState,
expectedCount,
windowStore,
keyValueStore);
}
// finally check if all keys are there
verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
expectedCount, windowStore, keyValueStore, true);
} finally {
producerRunnable.shutdown();
producerThread.interrupt();
@ -423,16 +490,16 @@ public class QueryableStateIntegrationTest { @@ -423,16 +490,16 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
batch1.addAll(Arrays.asList(
new KeyValue<>(keys[0], 1L),
new KeyValue<>(keys[1], 1L),
new KeyValue<>(keys[2], 3L),
new KeyValue<>(keys[3], 5L),
new KeyValue<>(keys[4], 2L)));
final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
expectedBatch1.addAll(Arrays.asList(
new KeyValue<>(keys[4], 2L)));
final Set<KeyValue<String, Long>> batch1 = new HashSet<>(
Arrays.asList(
new KeyValue<>(keys[0], 1L),
new KeyValue<>(keys[1], 1L),
new KeyValue<>(keys[2], 3L),
new KeyValue<>(keys[3], 5L),
new KeyValue<>(keys[4], 2L))
);
final Set<KeyValue<String, Long>> expectedBatch1 =
new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@ -489,13 +556,14 @@ public class QueryableStateIntegrationTest { @@ -489,13 +556,14 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>();
batch1.addAll(Arrays.asList(
new KeyValue<>(keys[0], "1"),
new KeyValue<>(keys[1], "1"),
new KeyValue<>(keys[2], "3"),
new KeyValue<>(keys[3], "5"),
new KeyValue<>(keys[4], "2")));
final Set<KeyValue<String, String>> batch1 = new HashSet<>(
Arrays.asList(
new KeyValue<>(keys[0], "1"),
new KeyValue<>(keys[1], "1"),
new KeyValue<>(keys[2], "3"),
new KeyValue<>(keys[3], "5"),
new KeyValue<>(keys[4], "2"))
);
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@ -508,12 +576,15 @@ public class QueryableStateIntegrationTest { @@ -508,12 +576,15 @@ public class QueryableStateIntegrationTest {
mockTime);
final KTable<String, String> t1 = builder.table(streamOne);
final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
}, Serdes.Long(), "queryMapValues");
final KTable<String, Long> t2 = t1.mapValues(
new ValueMapper<String, Long>() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
},
Serdes.Long(),
"queryMapValues");
t2.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
@ -521,9 +592,8 @@ public class QueryableStateIntegrationTest { @@ -521,9 +592,8 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long>
myMapStore = kafkaStreams.store("queryMapValues",
QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyKeyValueStore<String, Long> myMapStore =
kafkaStreams.store("queryMapValues", QueryableStoreTypes.<String, Long>keyValueStore());
for (final KeyValue<String, String> batchEntry : batch1) {
assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value));
}
@ -535,16 +605,16 @@ public class QueryableStateIntegrationTest { @@ -535,16 +605,16 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>();
batch1.addAll(Arrays.asList(
new KeyValue<>(keys[0], "1"),
new KeyValue<>(keys[1], "1"),
new KeyValue<>(keys[2], "3"),
new KeyValue<>(keys[3], "5"),
new KeyValue<>(keys[4], "2")));
final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
expectedBatch1.addAll(Arrays.asList(
new KeyValue<>(keys[4], 2L)));
final Set<KeyValue<String, String>> batch1 = new HashSet<>(
Arrays.asList(
new KeyValue<>(keys[0], "1"),
new KeyValue<>(keys[1], "1"),
new KeyValue<>(keys[2], "3"),
new KeyValue<>(keys[3], "5"),
new KeyValue<>(keys[4], "2"))
);
final Set<KeyValue<String, Long>> expectedBatch1 =
new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@ -564,12 +634,15 @@ public class QueryableStateIntegrationTest { @@ -564,12 +634,15 @@ public class QueryableStateIntegrationTest {
};
final KTable<String, String> t1 = builder.table(streamOne);
final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
}, Serdes.Long(), "queryMapValues");
final KTable<String, Long> t3 = t2.mapValues(
new ValueMapper<String, Long>() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
},
Serdes.Long(),
"queryMapValues");
t3.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
@ -584,14 +657,15 @@ public class QueryableStateIntegrationTest { @@ -584,14 +657,15 @@ public class QueryableStateIntegrationTest {
assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
}
for (final KeyValue<String, String> batchEntry : batch1) {
final KeyValue<String, Long> batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
final KeyValue<String, Long> batchEntryMapValue =
new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
if (!expectedBatch1.contains(batchEntryMapValue)) {
assertNull(myMapStore.get(batchEntry.key));
}
}
}
private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
@ -604,7 +678,6 @@ public class QueryableStateIntegrationTest { @@ -604,7 +678,6 @@ public class QueryableStateIntegrationTest {
new KeyValue<>(keys[3], "go"),
new KeyValue<>(keys[4], "kafka")));
final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
for (final String key : keys) {
expectedCount.add(new KeyValue<>(key, 1L));
@ -623,19 +696,24 @@ public class QueryableStateIntegrationTest { @@ -623,19 +696,24 @@ public class QueryableStateIntegrationTest {
final KStream<String, String> s1 = builder.stream(streamOne);
// Non Windowed
s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), outputTopic);
s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
final String storeName = "my-count";
s1.groupByKey()
.count(storeName)
.to(Serdes.String(), Serdes.Long(), outputTopic);
final String windowStoreName = "windowed-count";
s1.groupByKey()
.count(TimeWindows.of(WINDOW_SIZE), windowStoreName);
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long>
myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore());
verifyCanGetByKey(keys,
expectedCount,
expectedCount,
@ -667,16 +745,24 @@ public class QueryableStateIntegrationTest { @@ -667,16 +745,24 @@ public class QueryableStateIntegrationTest {
mockTime);
final int maxWaitMs = 30000;
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
TestUtils.waitForCondition(
new WaitForStore(storeName),
maxWaitMs,
"waiting for store " + storeName);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return new Long(8).equals(store.get("hello"));
}
}, maxWaitMs, "wait for count to be 8");
final ReadOnlyKeyValueStore<String, Long> store =
kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
return new Long(8).equals(store.get("hello"));
}
},
maxWaitMs,
"wait for count to be 8");
// close stream
kafkaStreams.close();
@ -686,17 +772,20 @@ public class QueryableStateIntegrationTest { @@ -686,17 +772,20 @@ public class QueryableStateIntegrationTest {
kafkaStreams.start();
// make sure we never get any value other than 8 for hello
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
try {
assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
return true;
} catch (final InvalidStateStoreException ise) {
return false;
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
try {
assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
return true;
} catch (final InvalidStateStoreException ise) {
return false;
}
}
}
}, maxWaitMs, "waiting for store " + storeName);
},
maxWaitMs,
"waiting for store " + storeName);
}
@ -706,6 +795,7 @@ public class QueryableStateIntegrationTest { @@ -706,6 +795,7 @@ public class QueryableStateIntegrationTest {
WaitForStore(final String storeName) {
this.storeName = storeName;
}
@Override
public boolean conditionMet() {
try {
@ -761,20 +851,28 @@ public class QueryableStateIntegrationTest { @@ -761,20 +851,28 @@ public class QueryableStateIntegrationTest {
mockTime);
final int maxWaitMs = 30000;
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
TestUtils.waitForCondition(
new WaitForStore(storeName),
maxWaitMs,
"waiting for store " + storeName);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return "12".equals(store.get("a")) && "34".equals(store.get("b"));
}
}, maxWaitMs, "wait for agg to be '123'");
final ReadOnlyKeyValueStore<String, String> store =
kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
return "12".equals(store.get("a")) && "34".equals(store.get("b"));
}
},
maxWaitMs,
"wait for agg to be '123'");
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
Arrays.asList(KeyValue.pair("a", "5")),
Collections.singleton(KeyValue.pair("a", "5")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
@ -782,33 +880,36 @@ public class QueryableStateIntegrationTest { @@ -782,33 +880,36 @@ public class QueryableStateIntegrationTest {
new Properties()),
mockTime);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return failed.get();
}
}, 30000, "wait for thread to fail");
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
return failed.get();
}
},
30000,
"wait for thread to fail");
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
}
}, maxWaitMs, "wait for agg to be '123'");
TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
}
},
maxWaitMs,
"wait for agg to be '123'");
}
private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyKeyValueStore<String, Long> myCount) {
final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>>
expectedRangeResults =
new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> expectedRangeResults = new TreeSet<>(stringLongComparator);
expectedRangeResults.addAll(Arrays.asList(
new KeyValue<>("hello", 1L),
@ -837,8 +938,7 @@ public class QueryableStateIntegrationTest { @@ -837,8 +938,7 @@ public class QueryableStateIntegrationTest {
final Set<KeyValue<String, Long>> expectedWindowState,
final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
final ReadOnlyKeyValueStore<String, Long> myCount)
throws InterruptedException {
final ReadOnlyKeyValueStore<String, Long> myCount) throws Exception {
final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
@ -868,31 +968,26 @@ public class QueryableStateIntegrationTest { @@ -868,31 +968,26 @@ public class QueryableStateIntegrationTest {
* @param expectedCount Expected count
* @param windowStore Window Store
* @param keyValueStore Key-value store
* @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false,
* the method merely inserts the new found key into the list of
* expected keys.
*/
private void verifyGreaterOrEqual(final String[] keys,
final Map<String, Long> expectedWindowedCount,
final Map<String, Long> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
final ReadOnlyKeyValueStore<String, Long> keyValueStore,
final boolean failIfKeyNotFound)
throws InterruptedException {
final ReadOnlyKeyValueStore<String, Long> keyValueStore) {
final Map<String, Long> windowState = new HashMap<>();
final Map<String, Long> countState = new HashMap<>();
for (final String key : keys) {
final Map<String, Long> map = fetchMap(windowStore, key);
if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) {
fail("Key not found " + key);
if (map.equals(Collections.<String, Long>emptyMap())) {
fail("Key in windowed-store not found " + key);
}
windowState.putAll(map);
final Long value = keyValueStore.get(key);
if (value != null) {
countState.put(key, value);
} else if (failIfKeyNotFound) {
fail("Key not found " + key);
} else {
fail("Key in key-value-store not found " + key);
}
}
@ -916,15 +1011,14 @@ public class QueryableStateIntegrationTest { @@ -916,15 +1011,14 @@ public class QueryableStateIntegrationTest {
}
private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
private void waitUntilAtLeastNumRecordProcessed(final String topic,
final int numRecs) throws Exception {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
config,
topic,
@ -934,8 +1028,8 @@ public class QueryableStateIntegrationTest { @@ -934,8 +1028,8 @@ public class QueryableStateIntegrationTest {
private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
final String key) {
final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
final WindowStoreIterator<Long> fetch =
store.fetch(key, 0, System.currentTimeMillis());
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
return Collections.singleton(KeyValue.pair(key, next.value));
@ -945,8 +1039,8 @@ public class QueryableStateIntegrationTest { @@ -945,8 +1039,8 @@ public class QueryableStateIntegrationTest {
private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store,
final String key) {
final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
final WindowStoreIterator<Long> fetch =
store.fetch(key, 0, System.currentTimeMillis());
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
return Collections.singletonMap(key, next.value);
@ -954,7 +1048,6 @@ public class QueryableStateIntegrationTest { @@ -954,7 +1048,6 @@ public class QueryableStateIntegrationTest {
return Collections.emptyMap();
}
/**
* A class that periodically produces records in a separate thread
*/
@ -965,13 +1058,15 @@ public class QueryableStateIntegrationTest { @@ -965,13 +1058,15 @@ public class QueryableStateIntegrationTest {
private int currIteration = 0;
boolean shutdown = false;
ProducerRunnable(final String topic, final List<String> inputValues, final int numIterations) {
ProducerRunnable(final String topic,
final List<String> inputValues,
final int numIterations) {
this.topic = topic;
this.inputValues = inputValues;
this.numIterations = numIterations;
}
private synchronized void incrementInteration() {
private synchronized void incrementIteration() {
currIteration++;
}
@ -993,17 +1088,16 @@ public class QueryableStateIntegrationTest { @@ -993,17 +1088,16 @@ public class QueryableStateIntegrationTest {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (final KafkaProducer<String, String> producer =
new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
while (getCurrIteration() < numIterations && !shutdown) {
for (final String value : inputValues) {
producer.send(new ProducerRecord<String, String>(topic, value));
}
incrementInteration();
incrementIteration();
}
}
}
}
}

Loading…
Cancel
Save