From 732bffcae6ad049d894e4dffe1907e8ceeb74a60 Mon Sep 17 00:00:00 2001 From: Hanyu Zheng <135176127+hanyuzheng7@users.noreply.github.com> Date: Mon, 16 Oct 2023 17:30:05 -0700 Subject: [PATCH] KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest (#14523) Reviewers: Matthias J. Sax --- .../integration/IQv2StoreIntegrationTest.java | 651 ++++++++++++++++-- 1 file changed, 576 insertions(+), 75 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index bde36508321..26d86896296 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -140,7 +140,7 @@ public class IQv2StoreIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final Position POSITION_0 = - Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0, 1L))))); + Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0, 5L))))); public static class UnknownQuery implements Query { } @@ -408,13 +408,15 @@ public class IQv2StoreIntegrationTest { final List> futures = new LinkedList<>(); try (final Producer producer = new KafkaProducer<>(producerProps)) { - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 10; i++) { + final int key = i / 2; + final int partition = key % partitions; final Future send = producer.send( new ProducerRecord<>( INPUT_TOPIC_NAME, - i % partitions, - RECORD_TIME, - i, + partition, + WINDOW_START + Duration.ofMinutes(2).toMillis() * i, + key, i, null ) @@ -438,8 +440,8 @@ public class IQv2StoreIntegrationTest { assertThat(INPUT_POSITION, equalTo( Position .emptyPosition() - .withComponent(INPUT_TOPIC_NAME, 0, 1L) - .withComponent(INPUT_TOPIC_NAME, 1, 1L) + .withComponent(INPUT_TOPIC_NAME, 0, 5L) + .withComponent(INPUT_TOPIC_NAME, 1, 3L) )); } @@ -650,12 +652,13 @@ public class IQv2StoreIntegrationTest { public void process(final Record record) { final TimestampedWindowStore stateStore = context().getStateStore(windowStoreStoreBuilder.name()); + // We don't re-implement the DSL logic (which implements sum) but instead just keep the lasted value per window stateStore.put( record.key(), ValueAndTimestamp.make( record.value(), record.timestamp() ), - WINDOW_START + (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis() ); } }; @@ -671,7 +674,8 @@ public class IQv2StoreIntegrationTest { public void process(final Record record) { final WindowStore stateStore = context().getStateStore(windowStoreStoreBuilder.name()); - stateStore.put(record.key(), record.value(), WINDOW_START); + // We don't re-implement the DSL logic (which implements sum) but instead just keep the lasted value per window + stateStore.put(record.key(), record.value(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis()); } }; } @@ -716,7 +720,9 @@ public class IQv2StoreIntegrationTest { final SessionStore stateStore = context().getStateStore(sessionStoreStoreBuilder.name()); stateStore.put( - new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)), + // we do not re-implement the actual session-window logic from the DSL here to keep the test simple, + // but instead just put each record into it's own session + new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())), record.value() ); } @@ -772,17 +778,27 @@ public class IQv2StoreIntegrationTest { shouldRejectUnknownQuery(); shouldCollectExecutionInfo(); shouldCollectExecutionInfoUnderFailure(); - + final String kind = this.kind; if (storeToTest.keyValue()) { if (storeToTest.timestamped()) { final Function, Integer> valueExtractor = ValueAndTimestamp::value; - shouldHandleKeyQuery(2, valueExtractor, 2); - shouldHandleRangeQueries(valueExtractor); + if (kind.equals("DSL")) { + shouldHandleKeyQuery(2, valueExtractor, 5); + shouldHandleRangeDSLQueries(valueExtractor); + } else { + shouldHandleKeyQuery(2, valueExtractor, 5); + shouldHandleRangePAPIQueries(valueExtractor); + } } else { final Function valueExtractor = Function.identity(); - shouldHandleKeyQuery(2, valueExtractor, 2); - shouldHandleRangeQueries(valueExtractor); + if (kind.equals("DSL")) { + shouldHandleKeyQuery(2, valueExtractor, 5); + shouldHandleRangeDSLQueries(valueExtractor); + } else { + shouldHandleKeyQuery(2, valueExtractor, 5); + shouldHandleRangePAPIQueries(valueExtractor); + } } } @@ -790,19 +806,33 @@ public class IQv2StoreIntegrationTest { if (storeToTest.timestamped()) { final Function, Integer> valueExtractor = ValueAndTimestamp::value; - shouldHandleWindowKeyQueries(valueExtractor); - shouldHandleWindowRangeQueries(valueExtractor); + if (kind.equals("DSL")) { + shouldHandleWindowKeyDSLQueries(valueExtractor); + shouldHandleWindowRangeDSLQueries(valueExtractor); + } else { + shouldHandleWindowKeyPAPIQueries(valueExtractor); + shouldHandleWindowRangePAPIQueries(valueExtractor); + } } else { final Function valueExtractor = Function.identity(); - shouldHandleWindowKeyQueries(valueExtractor); - shouldHandleWindowRangeQueries(valueExtractor); + if (kind.equals("DSL")) { + shouldHandleWindowKeyDSLQueries(valueExtractor); + shouldHandleWindowRangeDSLQueries(valueExtractor); + } else { + shouldHandleWindowKeyPAPIQueries(valueExtractor); + shouldHandleWindowRangePAPIQueries(valueExtractor); + } } } if (storeToTest.isSession()) { // Note there's no "timestamped" differentiation here. // Idiosyncratically, SessionStores are _never_ timestamped. - shouldHandleSessionKeyQueries(); + if (kind.equals("DSL")) { + shouldHandleSessionKeyDSLQueries(); + } else { + shouldHandleSessionKeyPAPIQueries(); + } } } } catch (final AssertionError e) { @@ -812,57 +842,211 @@ public class IQv2StoreIntegrationTest { } - private void shouldHandleRangeQueries(final Function extractor) { + private void shouldHandleRangeDSLQueries(final Function extractor) { + shouldHandleRangeQuery( + Optional.of(0), + Optional.of(4), + extractor, + mkSet(1, 3, 5, 7, 9) + ); + shouldHandleRangeQuery( Optional.of(1), Optional.of(3), extractor, - mkSet(1, 2, 3) - + mkSet(3, 5, 7) ); + shouldHandleRangeQuery( - Optional.of(1), + Optional.of(3), Optional.empty(), extractor, - mkSet(1, 2, 3) + mkSet(7, 9) + ); + shouldHandleRangeQuery( + Optional.empty(), + Optional.of(3), + extractor, + mkSet(1, 3, 5, 7) ); + shouldHandleRangeQuery( Optional.empty(), - Optional.of(1), + Optional.empty(), extractor, - mkSet(0, 1) + mkSet(1, 3, 5, 7, 9) + ); + } + + private void shouldHandleRangePAPIQueries(final Function extractor) { + shouldHandleRangeQuery( + Optional.of(0), + Optional.of(4), + extractor, + mkSet(1, 3, 5, 7, 9) + ); + shouldHandleRangeQuery( + Optional.of(1), + Optional.of(3), + extractor, + mkSet(3, 5, 7) ); + shouldHandleRangeQuery( + Optional.of(3), Optional.empty(), + extractor, + mkSet(7, 9) + ); + + shouldHandleRangeQuery( Optional.empty(), + Optional.of(3), extractor, - mkSet(0, 1, 2, 3) + mkSet(1, 3, 5, 7) + ); + shouldHandleRangeQuery( + Optional.empty(), + Optional.empty(), + extractor, + mkSet(1, 3, 5, 7, 9) ); } - private void shouldHandleWindowKeyQueries(final Function extractor) { + private void shouldHandleWindowKeyDSLQueries(final Function extractor) { // tightest possible start range + shouldHandleWindowKeyQuery( + 0, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START), + extractor, + mkSet(1) + ); + + // miss the window start range + shouldHandleWindowKeyQuery( + 0, + Instant.ofEpochMilli(WINDOW_START - 1), + Instant.ofEpochMilli(WINDOW_START - 1), + extractor, + mkSet() + ); + + // do the window key query at the first window and the key of record which we want to query is 2 shouldHandleWindowKeyQuery( 2, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), extractor, + mkSet() + ); + + // miss the key + shouldHandleWindowKeyQuery( + 999, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START), + extractor, + mkSet() + ); + + // miss both + shouldHandleWindowKeyQuery( + 999, + Instant.ofEpochMilli(WINDOW_START - 1), + Instant.ofEpochMilli(WINDOW_START - 1), + extractor, + mkSet() + ); + + // do the window key query at the first and the second windows and the key of record which we want to query is 0 + shouldHandleWindowKeyQuery( + 0, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + extractor, + mkSet(1) + ); + + // do the window key query at the first window and the key of record which we want to query is 1 + shouldHandleWindowKeyQuery( + 1, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START), + extractor, mkSet(2) ); - // miss the window start range + // do the window key query at the second and the third windows and the key of record which we want to query is 2 shouldHandleWindowKeyQuery( 2, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()), + extractor, + mkSet(4, 5) + ); + + // do the window key query at the second and the third windows and the key of record which we want to query is 3 + shouldHandleWindowKeyQuery( + 3, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()), + extractor, + mkSet(13) + ); + + // do the window key query at the fourth and the fifth windows and the key of record which we want to query is 4 + shouldHandleWindowKeyQuery( + 4, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(15).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()), + extractor, + mkSet(17) + ); + + // do the window key query at the fifth window and the key of record which we want to query is 4 + shouldHandleWindowKeyQuery( + 4, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(24).toMillis()), + extractor, + mkSet() + ); + } + + private void shouldHandleWindowKeyPAPIQueries(final Function extractor) { + + // tightest possible start range + shouldHandleWindowKeyQuery( + 0, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START), + extractor, + mkSet(1) + ); + + // miss the window start range + shouldHandleWindowKeyQuery( + 0, Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1), extractor, mkSet() ); + // do the window key query at the first window and the key of record which we want to query is 2 + shouldHandleWindowKeyQuery( + 2, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START), + extractor, + mkSet() + ); + // miss the key shouldHandleWindowKeyQuery( 999, @@ -880,19 +1064,179 @@ public class IQv2StoreIntegrationTest { extractor, mkSet() ); + + // do the window key query at the first and the second windows and the key of record which we want to query is 0 + shouldHandleWindowKeyQuery( + 0, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + extractor, + mkSet(1) + ); + + // do the window key query at the first window and the key of record which we want to query is 1 + shouldHandleWindowKeyQuery( + 1, + Instant.ofEpochMilli(WINDOW_START), + Instant.ofEpochMilli(WINDOW_START), + extractor, + mkSet(2) + ); + + // do the window key query at the second and the third windows and the key of record which we want to query is 2 + shouldHandleWindowKeyQuery( + 2, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()), + extractor, + mkSet(4, 5) + ); + + // do the window key query at the second and the third windows and the key of record which we want to query is 3 + shouldHandleWindowKeyQuery( + 3, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()), + extractor, + mkSet(7) + ); + + // do the window key query at the fourth and the fifth windows and the key of record which we want to query is 4 + shouldHandleWindowKeyQuery( + 4, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(15).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()), + extractor, + mkSet(9) + ); + + // do the window key query at the fifth window and the key of record which we want to query is 4 + shouldHandleWindowKeyQuery( + 4, + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()), + Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(24).toMillis()), + extractor, + mkSet() + ); } - private void shouldHandleWindowRangeQueries(final Function extractor) { + private void shouldHandleWindowRangeDSLQueries(final Function extractor) { final long windowSize = WINDOW_SIZE.toMillis(); final long windowStart = (RECORD_TIME / windowSize) * windowSize; + // miss the window start + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart - 1), + Instant.ofEpochMilli(windowStart - 1), + extractor, + mkSet() + ); + + // do the query at the first window + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart), + extractor, + mkSet(1, 2) + ); + + // do the query at the first and the second windows shouldHandleWindowRangeQuery( Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), + extractor, + mkSet(1, 2, 3, 4) + ); + + // do the query at the second and the third windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()), + extractor, + mkSet(3, 4, 5, 13) + ); + + // do the query at the third and the fourth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + extractor, + mkSet(17, 5, 13) + ); + + // do the query at the fourth and the fifth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()), + extractor, + mkSet(17) + ); + + //do the query at the fifth and the sixth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(25).toMillis()), + extractor, + mkSet() + ); + + // do the query from the second to the fourth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + extractor, + mkSet(17, 3, 4, 5, 13) + ); + + // do the query from the first to the fourth windows + shouldHandleWindowRangeQuery( Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), extractor, - mkSet(0, 1, 2, 3) + mkSet(1, 17, 2, 3, 4, 5, 13) ); + // Should fail to execute this query on a WindowStore. + final WindowRangeQuery query = WindowRangeQuery.withKey(2); + + final StateQueryRequest, T>> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + + final StateQueryResult, T>> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + if (result.getGlobalResult() != null) { + fail("global tables aren't implemented"); + } else { + final Map, T>>> queryResult = + result.getPartitionResults(); + for (final int partition : queryResult.keySet()) { + final QueryResult, T>> partitionResult = + queryResult.get(partition); + final boolean failure = partitionResult.isFailure(); + if (!failure) { + throw new AssertionError(queryResult.toString()); + } + assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); + assertThat(partitionResult.getFailureMessage(), matchesPattern( + "This store" + + " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)" + + " doesn't know how to execute the given query" + + " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)" + + " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\." + + " Contact the store maintainer if you need support for a new query type\\." + )); + } + } + } + + private void shouldHandleWindowRangePAPIQueries(final Function extractor) { + final long windowSize = WINDOW_SIZE.toMillis(); + final long windowStart = (RECORD_TIME / windowSize) * windowSize; + // miss the window start shouldHandleWindowRangeQuery( Instant.ofEpochMilli(windowStart - 1), @@ -901,17 +1245,81 @@ public class IQv2StoreIntegrationTest { mkSet() ); + // do the query at the first window + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart), + extractor, + mkSet(1, 2) + ); + + // do the query at the first and the second windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), + extractor, + mkSet(1, 2, 3, 4) + ); + + // do the query at the second and the third windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()), + extractor, + mkSet(3, 4, 5, 7) + ); + + // do the query at the third and the fourth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + extractor, + mkSet(5, 7, 9) + ); + + // do the query at the fourth and the fifth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()), + extractor, + mkSet(9) + ); + + //do the query at the fifth and the sixth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(25).toMillis()), + extractor, + mkSet() + ); + + // do the query from the second to the fourth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + extractor, + mkSet(3, 4, 5, 7, 9) + ); + + // do the query from the first to the fourth windows + shouldHandleWindowRangeQuery( + Instant.ofEpochMilli(windowStart), + Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()), + extractor, + mkSet(1, 2, 3, 4, 5, 7, 9) + ); + // Should fail to execute this query on a WindowStore. final WindowRangeQuery query = WindowRangeQuery.withKey(2); final StateQueryRequest, T>> request = - inStore(STORE_NAME) - .withQuery(query) - .withPartitions(mkSet(0, 1)) - .withPositionBound(PositionBound.at(INPUT_POSITION)); + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult, T>> result = - IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { fail("global tables aren't implemented"); @@ -938,10 +1346,103 @@ public class IQv2StoreIntegrationTest { } } - private void shouldHandleSessionKeyQueries() { + private void shouldHandleSessionKeyDSLQueries() { + shouldHandleSessionRangeQuery( + 0, + mkSet(1) + ); + + shouldHandleSessionRangeQuery( + 1, + mkSet(5) + ); + shouldHandleSessionRangeQuery( 2, - mkSet(2) + mkSet(9) + ); + + shouldHandleSessionRangeQuery( + 3, + mkSet(13) + ); + + shouldHandleSessionRangeQuery( + 4, + mkSet(17) + ); + + // not preset, so empty result iter + shouldHandleSessionRangeQuery( + 999, + mkSet() + ); + + // Should fail to execute this query on a SessionStore. + final WindowRangeQuery query = + WindowRangeQuery.withWindowStartRange( + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(0L) + ); + + final StateQueryRequest, T>> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + + final StateQueryResult, T>> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + if (result.getGlobalResult() != null) { + fail("global tables aren't implemented"); + } else { + final Map, T>>> queryResult = + result.getPartitionResults(); + for (final int partition : queryResult.keySet()) { + final QueryResult, T>> partitionResult = + queryResult.get(partition); + final boolean failure = partitionResult.isFailure(); + if (!failure) { + throw new AssertionError(queryResult.toString()); + } + assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); + assertThat(partitionResult.getFailureMessage(), is( + "This store" + + " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)" + + " doesn't know how to execute the given query" + + " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})" + + " because SessionStores only support WindowRangeQuery.withKey." + + " Contact the store maintainer if you need support for a new query type." + )); + } + } + } + + private void shouldHandleSessionKeyPAPIQueries() { + shouldHandleSessionRangeQuery( + 0, + mkSet(0, 1) + ); + + shouldHandleSessionRangeQuery( + 1, + mkSet(2, 3) + ); + + shouldHandleSessionRangeQuery( + 2, + mkSet(4, 5) + ); + + shouldHandleSessionRangeQuery( + 3, + mkSet(6, 7) + ); + + shouldHandleSessionRangeQuery( + 4, + mkSet(8, 9) ); // not preset, so empty result iter @@ -1008,7 +1509,7 @@ public class IQv2StoreIntegrationTest { assertThat( result.getGlobalResult().getFailureMessage(), is("Global stores do not yet support the KafkaStreams#query API." - + " Use KafkaStreams#store instead.") + + " Use KafkaStreams#store instead.") ); } @@ -1048,19 +1549,19 @@ public class IQv2StoreIntegrationTest { } public void shouldHandleKeyQuery( - final Integer key, - final Function valueExtactor, - final Integer expectedValue) { + final Integer key, + final Function valueExtactor, + final Integer expectedValue) { final KeyQuery query = KeyQuery.withKey(key); final StateQueryRequest request = - inStore(STORE_NAME) - .withQuery(query) - .withPartitions(mkSet(0, 1)) - .withPositionBound(PositionBound.at(INPUT_POSITION)); + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult result = - IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); final QueryResult queryResult = result.getOnlyPartitionResult(); final boolean failure = queryResult.isFailure(); @@ -1071,8 +1572,8 @@ public class IQv2StoreIntegrationTest { assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); assertThrows( - IllegalArgumentException.class, - queryResult::getFailureMessage + IllegalArgumentException.class, + queryResult::getFailureMessage ); final V result1 = queryResult.getResult(); @@ -1080,7 +1581,7 @@ public class IQv2StoreIntegrationTest { assertThat(integer, is(expectedValue)); assertThat(queryResult.getExecutionInfo(), is(empty())); assertThat(queryResult.getPosition(), is(POSITION_0)); - } + } public void shouldHandleRangeQuery( final Optional lower, @@ -1189,21 +1690,21 @@ public class IQv2StoreIntegrationTest { } public void shouldHandleWindowRangeQuery( - final Instant timeFrom, - final Instant timeTo, - final Function valueExtactor, - final Set expectedValue) { + final Instant timeFrom, + final Instant timeTo, + final Function valueExtactor, + final Set expectedValue) { final WindowRangeQuery query = WindowRangeQuery.withWindowStartRange(timeFrom, timeTo); final StateQueryRequest, V>> request = - inStore(STORE_NAME) - .withQuery(query) - .withPartitions(mkSet(0, 1)) - .withPositionBound(PositionBound.at(INPUT_POSITION)); + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult, V>> result = - IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { fail("global tables aren't implemented"); @@ -1218,12 +1719,12 @@ public class IQv2StoreIntegrationTest { assertThat(queryResult.get(partition).isSuccess(), is(true)); assertThrows( - IllegalArgumentException.class, - queryResult.get(partition)::getFailureReason + IllegalArgumentException.class, + queryResult.get(partition)::getFailureReason ); assertThrows( - IllegalArgumentException.class, - queryResult.get(partition)::getFailureMessage + IllegalArgumentException.class, + queryResult.get(partition)::getFailureMessage ); try (final KeyValueIterator, V> iterator = queryResult.get(partition).getResult()) { @@ -1239,18 +1740,18 @@ public class IQv2StoreIntegrationTest { } public void shouldHandleSessionRangeQuery( - final Integer key, - final Set expectedValue) { + final Integer key, + final Set expectedValue) { final WindowRangeQuery query = WindowRangeQuery.withKey(key); final StateQueryRequest, V>> request = - inStore(STORE_NAME) - .withQuery(query) - .withPartitions(mkSet(0, 1)) - .withPositionBound(PositionBound.at(INPUT_POSITION)); + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult, V>> result = - IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { fail("global tables aren't implemented"); @@ -1265,12 +1766,12 @@ public class IQv2StoreIntegrationTest { assertThat(queryResult.get(partition).isSuccess(), is(true)); assertThrows( - IllegalArgumentException.class, - queryResult.get(partition)::getFailureReason + IllegalArgumentException.class, + queryResult.get(partition)::getFailureReason ); assertThrows( - IllegalArgumentException.class, - queryResult.get(partition)::getFailureMessage + IllegalArgumentException.class, + queryResult.get(partition)::getFailureMessage ); try (final KeyValueIterator, V> iterator = queryResult.get(partition).getResult()) {