From 933a813950b1ae06e19ead23734fa6bdf1f32e3f Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 29 Oct 2020 11:57:31 -0500 Subject: [PATCH] KAFKA-10638: Fix QueryableStateIntegrationTest (#9521) This test has been observed to have flaky failures. Apparently, in the failed runs, Streams had entered a rebalance before some of the assertions were made. We recently made IQ a little stricter on whether it would return errors instead of null responses in such cases: KAFKA-10598: Improve IQ name and type checks (#9408) As a result, we have started seeing failures now instead of silently executing an invalid test (I.e., it was asserting the return to be null, but the result was null for the wrong reason). Now, if the test discovers that Streams is no longer running, it will repeat the verification until it actually gets a valid positive or negative result. Reviewers: Chia-Ping Tsai --- .../streams/state/QueryableStoreType.java | 2 +- .../StoreQueryIntegrationTest.java | 265 +++++++++++------- 2 files changed, 165 insertions(+), 102 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java index 79c335a219f..97715539e1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java @@ -44,7 +44,7 @@ public interface QueryableStoreType { * @param storeProvider provides access to all the underlying StateStore instances * @param storeName The name of the Store * @return a read-only interface over a {@code StateStore} - * (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType}) + * (cf. {@link QueryableStoreTypes.KeyValueStoreType}) */ T create(final StateStoreProvider storeProvider, final String storeName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index 9cc519463e0..712ae917b18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -28,15 +28,16 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; -import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -44,6 +45,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; @@ -55,19 +58,25 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @Category({IntegrationTest.class}) public class StoreQueryIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(StoreQueryIntegrationTest.class); + private static final int NUM_BROKERS = 1; private static int port = 0; private static final String INPUT_TOPIC_NAME = "input-topic"; @@ -102,10 +111,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -117,17 +126,34 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); - - final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; - - // Assert that only active is able to query for a key by default - assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); - assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue())); + until(() -> { + + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + + final QueryableStoreType> queryableStoreType = keyValueStore(); + final ReadOnlyKeyValueStore store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); + final ReadOnlyKeyValueStore store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); + + final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; + + // Assert that only active is able to query for a key by default + assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); + try { + if (kafkaStreams1IsActive) { + assertThat(store2.get(key), is(nullValue())); + } else { + assertThat(store1.get(key), is(nullValue())); + } + return true; + } catch (final InvalidStateStoreException exception) { + assertThat( + exception.getMessage(), + containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING") + ); + LOG.info("Streams wasn't running. Will try again."); + return false; + } + }); } @Test @@ -138,10 +164,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -153,51 +179,75 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - - //key belongs to this partition - final int keyPartition = keyQueryMetadata.partition(); - - //key doesn't belongs to this partition - final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; - - StoreQueryParameters> storeQueryParam = - StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) - .withPartition(keyPartition); - ReadOnlyKeyValueStore store1 = null; - ReadOnlyKeyValueStore store2 = null; - if (kafkaStreams1IsActive) { - store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); - } else { - store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); - } - - if (kafkaStreams1IsActive) { - assertThat(store1, is(notNullValue())); - assertThat(store2, is(nullValue())); - } else { - assertThat(store2, is(notNullValue())); - assertThat(store1, is(nullValue())); - } + until(() -> { + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + + //key belongs to this partition + final int keyPartition = keyQueryMetadata.partition(); + + //key doesn't belongs to this partition + final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; + final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; + + final StoreQueryParameters> storeQueryParam = + StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyPartition); + ReadOnlyKeyValueStore store1 = null; + ReadOnlyKeyValueStore store2 = null; + if (kafkaStreams1IsActive) { + store1 = getStore(kafkaStreams1, storeQueryParam); + } else { + store2 = getStore(kafkaStreams2, storeQueryParam); + } + + if (kafkaStreams1IsActive) { + assertThat(store1, is(notNullValue())); + assertThat(store2, is(nullValue())); + } else { + assertThat(store2, is(notNullValue())); + assertThat(store1, is(nullValue())); + } + + // Assert that only active for a specific requested partition serves key if stale stores and not enabled + assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); + + final StoreQueryParameters> storeQueryParam2 = + StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyDontBelongPartition); - // Assert that only active for a specific requested partition serves key if stale stores and not enabled - assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); - storeQueryParam = StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) - .withPartition(keyDontBelongPartition); - ReadOnlyKeyValueStore store3 = null; - ReadOnlyKeyValueStore store4 = null; - if (!kafkaStreams1IsActive) { - store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); - } else { - store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); - } - // Assert that key is not served when wrong specific partition is requested - // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition - // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested - assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue())); + try { + // Assert that key is not served when wrong specific partition is requested + // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition + // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested + if (kafkaStreams1IsActive) { + assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = + assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key)); + assertThat( + exception.getMessage(), + containsString("The specified partition 1 for store source-table does not exist.") + ); + } else { + assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = + assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key)); + assertThat( + exception.getMessage(), + containsString("The specified partition 1 for store source-table does not exist.") + ); + } + return true; + } catch (final InvalidStateStoreException exception) { + assertThat( + exception.getMessage(), + containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING") + ); + LOG.info("Streams wasn't running. Will try again."); + return false; + } + }); } @Test @@ -208,10 +258,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -224,17 +274,15 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + final QueryableStoreType> queryableStoreType = keyValueStore(); // Assert that both active and standby are able to query for a key TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils - .getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType); + final ReadOnlyKeyValueStore store1 = getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils - .getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType); + final ReadOnlyKeyValueStore store2 = getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType); return store2.get(key) != null; }, "store2 cannot find results for key"); } @@ -247,10 +295,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -269,7 +317,7 @@ public class StoreQueryIntegrationTest { //key doesn't belongs to this partition final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + final QueryableStoreType> queryableStoreType = keyValueStore(); // Assert that both active and standby are able to query for a key final StoreQueryParameters> param = StoreQueryParameters @@ -277,11 +325,11 @@ public class StoreQueryIntegrationTest { .enableStaleStores() .withPartition(keyPartition); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(kafkaStreams1, param); + final ReadOnlyKeyValueStore store1 = getStore(kafkaStreams1, param); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(kafkaStreams2, param); + final ReadOnlyKeyValueStore store2 = getStore(kafkaStreams2, param); return store2.get(key) != null; }, "store2 cannot find results for key"); @@ -289,8 +337,8 @@ public class StoreQueryIntegrationTest { .fromNameAndType(TABLE_NAME, queryableStoreType) .enableStaleStores() .withPartition(keyDontBelongPartition); - final ReadOnlyKeyValueStore store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam); - final ReadOnlyKeyValueStore store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam); + final ReadOnlyKeyValueStore store3 = getStore(kafkaStreams1, otherParam); + final ReadOnlyKeyValueStore store4 = getStore(kafkaStreams2, otherParam); // Assert that assertThat(store3.get(key), is(nullValue())); @@ -306,10 +354,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final Properties streamsConfiguration1 = streamsConfiguration(); streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); @@ -337,34 +385,49 @@ public class StoreQueryIntegrationTest { //key doesn't belongs to this partition final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + final QueryableStoreType> queryableStoreType = keyValueStore(); // Assert that both active and standby are able to query for a key final StoreQueryParameters> param = StoreQueryParameters - .fromNameAndType(TABLE_NAME, queryableStoreType) - .enableStaleStores() - .withPartition(keyPartition); + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyPartition); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(kafkaStreams1, param); + final ReadOnlyKeyValueStore store1 = getStore(kafkaStreams1, param); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(kafkaStreams2, param); + final ReadOnlyKeyValueStore store2 = getStore(kafkaStreams2, param); return store2.get(key) != null; }, "store2 cannot find results for key"); final StoreQueryParameters> otherParam = StoreQueryParameters - .fromNameAndType(TABLE_NAME, queryableStoreType) - .enableStaleStores() - .withPartition(keyDontBelongPartition); - final ReadOnlyKeyValueStore store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam); - final ReadOnlyKeyValueStore store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam); + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyDontBelongPartition); + final ReadOnlyKeyValueStore store3 = getStore(kafkaStreams1, otherParam); + final ReadOnlyKeyValueStore store4 = getStore(kafkaStreams2, otherParam); // Assert that assertThat(store3.get(key), is(nullValue())); assertThat(store4.get(key), is(nullValue())); } + private static void until(final TestCondition condition) { + boolean success = false; + final long deadline = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT; + while (!success && System.currentTimeMillis() < deadline) { + try { + success = condition.conditionMet(); + Thread.sleep(500L); + } catch (final RuntimeException e) { + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + } + private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) { final KafkaStreams streams = new KafkaStreams(builder.build(config), config); streamsToCleanup.add(streams); @@ -378,12 +441,12 @@ public class StoreQueryIntegrationTest { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); IntegrationTestUtils.produceKeyValuesSynchronously( - INPUT_TOPIC_NAME, - IntStream.range(start, endExclusive) - .mapToObj(i -> KeyValue.pair(key, i)) - .collect(Collectors.toList()), - producerProps, - mockTime); + INPUT_TOPIC_NAME, + IntStream.range(start, endExclusive) + .mapToObj(i -> KeyValue.pair(key, i)) + .collect(Collectors.toList()), + producerProps, + mockTime); } private Properties streamsConfiguration() {