diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java index 79c335a219f..97715539e1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java @@ -44,7 +44,7 @@ public interface QueryableStoreType { * @param storeProvider provides access to all the underlying StateStore instances * @param storeName The name of the Store * @return a read-only interface over a {@code StateStore} - * (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType}) + * (cf. {@link QueryableStoreTypes.KeyValueStoreType}) */ T create(final StateStoreProvider storeProvider, final String storeName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index 9cc519463e0..712ae917b18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -28,15 +28,16 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; -import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -44,6 +45,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; @@ -55,19 +58,25 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @Category({IntegrationTest.class}) public class StoreQueryIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(StoreQueryIntegrationTest.class); + private static final int NUM_BROKERS = 1; private static int port = 0; private static final String INPUT_TOPIC_NAME = "input-topic"; @@ -102,10 +111,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -117,17 +126,34 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); - - final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; - - // Assert that only active is able to query for a key by default - assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); - assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue())); + until(() -> { + + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + + final QueryableStoreType> queryableStoreType = keyValueStore(); + final ReadOnlyKeyValueStore store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); + final ReadOnlyKeyValueStore store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); + + final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; + + // Assert that only active is able to query for a key by default + assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); + try { + if (kafkaStreams1IsActive) { + assertThat(store2.get(key), is(nullValue())); + } else { + assertThat(store1.get(key), is(nullValue())); + } + return true; + } catch (final InvalidStateStoreException exception) { + assertThat( + exception.getMessage(), + containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING") + ); + LOG.info("Streams wasn't running. Will try again."); + return false; + } + }); } @Test @@ -138,10 +164,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -153,51 +179,75 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - - //key belongs to this partition - final int keyPartition = keyQueryMetadata.partition(); - - //key doesn't belongs to this partition - final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; - - StoreQueryParameters> storeQueryParam = - StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) - .withPartition(keyPartition); - ReadOnlyKeyValueStore store1 = null; - ReadOnlyKeyValueStore store2 = null; - if (kafkaStreams1IsActive) { - store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); - } else { - store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); - } - - if (kafkaStreams1IsActive) { - assertThat(store1, is(notNullValue())); - assertThat(store2, is(nullValue())); - } else { - assertThat(store2, is(notNullValue())); - assertThat(store1, is(nullValue())); - } + until(() -> { + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + + //key belongs to this partition + final int keyPartition = keyQueryMetadata.partition(); + + //key doesn't belongs to this partition + final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; + final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; + + final StoreQueryParameters> storeQueryParam = + StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyPartition); + ReadOnlyKeyValueStore store1 = null; + ReadOnlyKeyValueStore store2 = null; + if (kafkaStreams1IsActive) { + store1 = getStore(kafkaStreams1, storeQueryParam); + } else { + store2 = getStore(kafkaStreams2, storeQueryParam); + } + + if (kafkaStreams1IsActive) { + assertThat(store1, is(notNullValue())); + assertThat(store2, is(nullValue())); + } else { + assertThat(store2, is(notNullValue())); + assertThat(store1, is(nullValue())); + } + + // Assert that only active for a specific requested partition serves key if stale stores and not enabled + assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); + + final StoreQueryParameters> storeQueryParam2 = + StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyDontBelongPartition); - // Assert that only active for a specific requested partition serves key if stale stores and not enabled - assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); - storeQueryParam = StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) - .withPartition(keyDontBelongPartition); - ReadOnlyKeyValueStore store3 = null; - ReadOnlyKeyValueStore store4 = null; - if (!kafkaStreams1IsActive) { - store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); - } else { - store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); - } - // Assert that key is not served when wrong specific partition is requested - // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition - // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested - assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue())); + try { + // Assert that key is not served when wrong specific partition is requested + // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition + // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested + if (kafkaStreams1IsActive) { + assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = + assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key)); + assertThat( + exception.getMessage(), + containsString("The specified partition 1 for store source-table does not exist.") + ); + } else { + assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = + assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key)); + assertThat( + exception.getMessage(), + containsString("The specified partition 1 for store source-table does not exist.") + ); + } + return true; + } catch (final InvalidStateStoreException exception) { + assertThat( + exception.getMessage(), + containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING") + ); + LOG.info("Streams wasn't running. Will try again."); + return false; + } + }); } @Test @@ -208,10 +258,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -224,17 +274,15 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + final QueryableStoreType> queryableStoreType = keyValueStore(); // Assert that both active and standby are able to query for a key TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils - .getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType); + final ReadOnlyKeyValueStore store1 = getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils - .getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType); + final ReadOnlyKeyValueStore store2 = getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType); return store2.get(key) != null; }, "store2 cannot find results for key"); } @@ -247,10 +295,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); @@ -269,7 +317,7 @@ public class StoreQueryIntegrationTest { //key doesn't belongs to this partition final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + final QueryableStoreType> queryableStoreType = keyValueStore(); // Assert that both active and standby are able to query for a key final StoreQueryParameters> param = StoreQueryParameters @@ -277,11 +325,11 @@ public class StoreQueryIntegrationTest { .enableStaleStores() .withPartition(keyPartition); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(kafkaStreams1, param); + final ReadOnlyKeyValueStore store1 = getStore(kafkaStreams1, param); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(kafkaStreams2, param); + final ReadOnlyKeyValueStore store2 = getStore(kafkaStreams2, param); return store2.get(key) != null; }, "store2 cannot find results for key"); @@ -289,8 +337,8 @@ public class StoreQueryIntegrationTest { .fromNameAndType(TABLE_NAME, queryableStoreType) .enableStaleStores() .withPartition(keyDontBelongPartition); - final ReadOnlyKeyValueStore store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam); - final ReadOnlyKeyValueStore store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam); + final ReadOnlyKeyValueStore store3 = getStore(kafkaStreams1, otherParam); + final ReadOnlyKeyValueStore store4 = getStore(kafkaStreams2, otherParam); // Assert that assertThat(store3.get(key), is(nullValue())); @@ -306,10 +354,10 @@ public class StoreQueryIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), - Materialized.>as(TABLE_NAME) - .withCachingDisabled()) - .toStream() - .peek((k, v) -> semaphore.release()); + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); final Properties streamsConfiguration1 = streamsConfiguration(); streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); @@ -337,34 +385,49 @@ public class StoreQueryIntegrationTest { //key doesn't belongs to this partition final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + final QueryableStoreType> queryableStoreType = keyValueStore(); // Assert that both active and standby are able to query for a key final StoreQueryParameters> param = StoreQueryParameters - .fromNameAndType(TABLE_NAME, queryableStoreType) - .enableStaleStores() - .withPartition(keyPartition); + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyPartition); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(kafkaStreams1, param); + final ReadOnlyKeyValueStore store1 = getStore(kafkaStreams1, param); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(kafkaStreams2, param); + final ReadOnlyKeyValueStore store2 = getStore(kafkaStreams2, param); return store2.get(key) != null; }, "store2 cannot find results for key"); final StoreQueryParameters> otherParam = StoreQueryParameters - .fromNameAndType(TABLE_NAME, queryableStoreType) - .enableStaleStores() - .withPartition(keyDontBelongPartition); - final ReadOnlyKeyValueStore store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam); - final ReadOnlyKeyValueStore store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam); + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyDontBelongPartition); + final ReadOnlyKeyValueStore store3 = getStore(kafkaStreams1, otherParam); + final ReadOnlyKeyValueStore store4 = getStore(kafkaStreams2, otherParam); // Assert that assertThat(store3.get(key), is(nullValue())); assertThat(store4.get(key), is(nullValue())); } + private static void until(final TestCondition condition) { + boolean success = false; + final long deadline = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT; + while (!success && System.currentTimeMillis() < deadline) { + try { + success = condition.conditionMet(); + Thread.sleep(500L); + } catch (final RuntimeException e) { + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + } + private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) { final KafkaStreams streams = new KafkaStreams(builder.build(config), config); streamsToCleanup.add(streams); @@ -378,12 +441,12 @@ public class StoreQueryIntegrationTest { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); IntegrationTestUtils.produceKeyValuesSynchronously( - INPUT_TOPIC_NAME, - IntStream.range(start, endExclusive) - .mapToObj(i -> KeyValue.pair(key, i)) - .collect(Collectors.toList()), - producerProps, - mockTime); + INPUT_TOPIC_NAME, + IntStream.range(start, endExclusive) + .mapToObj(i -> KeyValue.pair(key, i)) + .collect(Collectors.toList()), + producerProps, + mockTime); } private Properties streamsConfiguration() {