Browse Source

KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest (#14523)

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/14569/head
Hanyu Zheng 1 year ago committed by GitHub
parent
commit
732bffcae6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 651
      streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java

651
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java

@ -140,7 +140,7 @@ public class IQv2StoreIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private static final Position POSITION_0 = private static final Position POSITION_0 =
Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0, 1L))))); Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0, 5L)))));
public static class UnknownQuery implements Query<Void> { } public static class UnknownQuery implements Query<Void> { }
@ -408,13 +408,15 @@ public class IQv2StoreIntegrationTest {
final List<Future<RecordMetadata>> futures = new LinkedList<>(); final List<Future<RecordMetadata>> futures = new LinkedList<>();
try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) { try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
for (int i = 0; i < 4; i++) { for (int i = 0; i < 10; i++) {
final int key = i / 2;
final int partition = key % partitions;
final Future<RecordMetadata> send = producer.send( final Future<RecordMetadata> send = producer.send(
new ProducerRecord<>( new ProducerRecord<>(
INPUT_TOPIC_NAME, INPUT_TOPIC_NAME,
i % partitions, partition,
RECORD_TIME, WINDOW_START + Duration.ofMinutes(2).toMillis() * i,
i, key,
i, i,
null null
) )
@ -438,8 +440,8 @@ public class IQv2StoreIntegrationTest {
assertThat(INPUT_POSITION, equalTo( assertThat(INPUT_POSITION, equalTo(
Position Position
.emptyPosition() .emptyPosition()
.withComponent(INPUT_TOPIC_NAME, 0, 1L) .withComponent(INPUT_TOPIC_NAME, 0, 5L)
.withComponent(INPUT_TOPIC_NAME, 1, 1L) .withComponent(INPUT_TOPIC_NAME, 1, 3L)
)); ));
} }
@ -650,12 +652,13 @@ public class IQv2StoreIntegrationTest {
public void process(final Record<Integer, Integer> record) { public void process(final Record<Integer, Integer> record) {
final TimestampedWindowStore<Integer, Integer> stateStore = final TimestampedWindowStore<Integer, Integer> stateStore =
context().getStateStore(windowStoreStoreBuilder.name()); context().getStateStore(windowStoreStoreBuilder.name());
// We don't re-implement the DSL logic (which implements sum) but instead just keep the lasted value per window
stateStore.put( stateStore.put(
record.key(), record.key(),
ValueAndTimestamp.make( ValueAndTimestamp.make(
record.value(), record.timestamp() record.value(), record.timestamp()
), ),
WINDOW_START (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis()
); );
} }
}; };
@ -671,7 +674,8 @@ public class IQv2StoreIntegrationTest {
public void process(final Record<Integer, Integer> record) { public void process(final Record<Integer, Integer> record) {
final WindowStore<Integer, Integer> stateStore = final WindowStore<Integer, Integer> stateStore =
context().getStateStore(windowStoreStoreBuilder.name()); context().getStateStore(windowStoreStoreBuilder.name());
stateStore.put(record.key(), record.value(), WINDOW_START); // We don't re-implement the DSL logic (which implements sum) but instead just keep the lasted value per window
stateStore.put(record.key(), record.value(), (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis());
} }
}; };
} }
@ -716,7 +720,9 @@ public class IQv2StoreIntegrationTest {
final SessionStore<Integer, Integer> stateStore = final SessionStore<Integer, Integer> stateStore =
context().getStateStore(sessionStoreStoreBuilder.name()); context().getStateStore(sessionStoreStoreBuilder.name());
stateStore.put( stateStore.put(
new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)), // we do not re-implement the actual session-window logic from the DSL here to keep the test simple,
// but instead just put each record into it's own session
new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())),
record.value() record.value()
); );
} }
@ -772,17 +778,27 @@ public class IQv2StoreIntegrationTest {
shouldRejectUnknownQuery(); shouldRejectUnknownQuery();
shouldCollectExecutionInfo(); shouldCollectExecutionInfo();
shouldCollectExecutionInfoUnderFailure(); shouldCollectExecutionInfoUnderFailure();
final String kind = this.kind;
if (storeToTest.keyValue()) { if (storeToTest.keyValue()) {
if (storeToTest.timestamped()) { if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor = final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value; ValueAndTimestamp::value;
shouldHandleKeyQuery(2, valueExtractor, 2); if (kind.equals("DSL")) {
shouldHandleRangeQueries(valueExtractor); shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangeDSLQueries(valueExtractor);
} else {
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangePAPIQueries(valueExtractor);
}
} else { } else {
final Function<Integer, Integer> valueExtractor = Function.identity(); final Function<Integer, Integer> valueExtractor = Function.identity();
shouldHandleKeyQuery(2, valueExtractor, 2); if (kind.equals("DSL")) {
shouldHandleRangeQueries(valueExtractor); shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangeDSLQueries(valueExtractor);
} else {
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangePAPIQueries(valueExtractor);
}
} }
} }
@ -790,19 +806,33 @@ public class IQv2StoreIntegrationTest {
if (storeToTest.timestamped()) { if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor = final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value; ValueAndTimestamp::value;
shouldHandleWindowKeyQueries(valueExtractor); if (kind.equals("DSL")) {
shouldHandleWindowRangeQueries(valueExtractor); shouldHandleWindowKeyDSLQueries(valueExtractor);
shouldHandleWindowRangeDSLQueries(valueExtractor);
} else {
shouldHandleWindowKeyPAPIQueries(valueExtractor);
shouldHandleWindowRangePAPIQueries(valueExtractor);
}
} else { } else {
final Function<Integer, Integer> valueExtractor = Function.identity(); final Function<Integer, Integer> valueExtractor = Function.identity();
shouldHandleWindowKeyQueries(valueExtractor); if (kind.equals("DSL")) {
shouldHandleWindowRangeQueries(valueExtractor); shouldHandleWindowKeyDSLQueries(valueExtractor);
shouldHandleWindowRangeDSLQueries(valueExtractor);
} else {
shouldHandleWindowKeyPAPIQueries(valueExtractor);
shouldHandleWindowRangePAPIQueries(valueExtractor);
}
} }
} }
if (storeToTest.isSession()) { if (storeToTest.isSession()) {
// Note there's no "timestamped" differentiation here. // Note there's no "timestamped" differentiation here.
// Idiosyncratically, SessionStores are _never_ timestamped. // Idiosyncratically, SessionStores are _never_ timestamped.
shouldHandleSessionKeyQueries(); if (kind.equals("DSL")) {
shouldHandleSessionKeyDSLQueries();
} else {
shouldHandleSessionKeyPAPIQueries();
}
} }
} }
} catch (final AssertionError e) { } catch (final AssertionError e) {
@ -812,57 +842,211 @@ public class IQv2StoreIntegrationTest {
} }
private <T> void shouldHandleRangeQueries(final Function<T, Integer> extractor) { private <T> void shouldHandleRangeDSLQueries(final Function<T, Integer> extractor) {
shouldHandleRangeQuery(
Optional.of(0),
Optional.of(4),
extractor,
mkSet(1, 3, 5, 7, 9)
);
shouldHandleRangeQuery( shouldHandleRangeQuery(
Optional.of(1), Optional.of(1),
Optional.of(3), Optional.of(3),
extractor, extractor,
mkSet(1, 2, 3) mkSet(3, 5, 7)
); );
shouldHandleRangeQuery( shouldHandleRangeQuery(
Optional.of(1), Optional.of(3),
Optional.empty(), Optional.empty(),
extractor, extractor,
mkSet(1, 2, 3) mkSet(7, 9)
);
shouldHandleRangeQuery(
Optional.empty(),
Optional.of(3),
extractor,
mkSet(1, 3, 5, 7)
); );
shouldHandleRangeQuery( shouldHandleRangeQuery(
Optional.empty(), Optional.empty(),
Optional.of(1), Optional.empty(),
extractor, extractor,
mkSet(0, 1) mkSet(1, 3, 5, 7, 9)
);
}
private <T> void shouldHandleRangePAPIQueries(final Function<T, Integer> extractor) {
shouldHandleRangeQuery(
Optional.of(0),
Optional.of(4),
extractor,
mkSet(1, 3, 5, 7, 9)
);
shouldHandleRangeQuery(
Optional.of(1),
Optional.of(3),
extractor,
mkSet(3, 5, 7)
); );
shouldHandleRangeQuery( shouldHandleRangeQuery(
Optional.of(3),
Optional.empty(), Optional.empty(),
extractor,
mkSet(7, 9)
);
shouldHandleRangeQuery(
Optional.empty(), Optional.empty(),
Optional.of(3),
extractor, extractor,
mkSet(0, 1, 2, 3) mkSet(1, 3, 5, 7)
);
shouldHandleRangeQuery(
Optional.empty(),
Optional.empty(),
extractor,
mkSet(1, 3, 5, 7, 9)
); );
} }
private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) { private <T> void shouldHandleWindowKeyDSLQueries(final Function<T, Integer> extractor) {
// tightest possible start range // tightest possible start range
shouldHandleWindowKeyQuery(
0,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet(1)
);
// miss the window start range
shouldHandleWindowKeyQuery(
0,
Instant.ofEpochMilli(WINDOW_START - 1),
Instant.ofEpochMilli(WINDOW_START - 1),
extractor,
mkSet()
);
// do the window key query at the first window and the key of record which we want to query is 2
shouldHandleWindowKeyQuery( shouldHandleWindowKeyQuery(
2, 2,
Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START),
extractor, extractor,
mkSet()
);
// miss the key
shouldHandleWindowKeyQuery(
999,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet()
);
// miss both
shouldHandleWindowKeyQuery(
999,
Instant.ofEpochMilli(WINDOW_START - 1),
Instant.ofEpochMilli(WINDOW_START - 1),
extractor,
mkSet()
);
// do the window key query at the first and the second windows and the key of record which we want to query is 0
shouldHandleWindowKeyQuery(
0,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()),
extractor,
mkSet(1)
);
// do the window key query at the first window and the key of record which we want to query is 1
shouldHandleWindowKeyQuery(
1,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet(2) mkSet(2)
); );
// miss the window start range // do the window key query at the second and the third windows and the key of record which we want to query is 2
shouldHandleWindowKeyQuery( shouldHandleWindowKeyQuery(
2, 2,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()),
extractor,
mkSet(4, 5)
);
// do the window key query at the second and the third windows and the key of record which we want to query is 3
shouldHandleWindowKeyQuery(
3,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()),
extractor,
mkSet(13)
);
// do the window key query at the fourth and the fifth windows and the key of record which we want to query is 4
shouldHandleWindowKeyQuery(
4,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(15).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()),
extractor,
mkSet(17)
);
// do the window key query at the fifth window and the key of record which we want to query is 4
shouldHandleWindowKeyQuery(
4,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(24).toMillis()),
extractor,
mkSet()
);
}
private <T> void shouldHandleWindowKeyPAPIQueries(final Function<T, Integer> extractor) {
// tightest possible start range
shouldHandleWindowKeyQuery(
0,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet(1)
);
// miss the window start range
shouldHandleWindowKeyQuery(
0,
Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1),
Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1),
extractor, extractor,
mkSet() mkSet()
); );
// do the window key query at the first window and the key of record which we want to query is 2
shouldHandleWindowKeyQuery(
2,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet()
);
// miss the key // miss the key
shouldHandleWindowKeyQuery( shouldHandleWindowKeyQuery(
999, 999,
@ -880,19 +1064,179 @@ public class IQv2StoreIntegrationTest {
extractor, extractor,
mkSet() mkSet()
); );
// do the window key query at the first and the second windows and the key of record which we want to query is 0
shouldHandleWindowKeyQuery(
0,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()),
extractor,
mkSet(1)
);
// do the window key query at the first window and the key of record which we want to query is 1
shouldHandleWindowKeyQuery(
1,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
mkSet(2)
);
// do the window key query at the second and the third windows and the key of record which we want to query is 2
shouldHandleWindowKeyQuery(
2,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()),
extractor,
mkSet(4, 5)
);
// do the window key query at the second and the third windows and the key of record which we want to query is 3
shouldHandleWindowKeyQuery(
3,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10).toMillis()),
extractor,
mkSet(7)
);
// do the window key query at the fourth and the fifth windows and the key of record which we want to query is 4
shouldHandleWindowKeyQuery(
4,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(15).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()),
extractor,
mkSet(9)
);
// do the window key query at the fifth window and the key of record which we want to query is 4
shouldHandleWindowKeyQuery(
4,
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20).toMillis()),
Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(24).toMillis()),
extractor,
mkSet()
);
} }
private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> extractor) { private <T> void shouldHandleWindowRangeDSLQueries(final Function<T, Integer> extractor) {
final long windowSize = WINDOW_SIZE.toMillis(); final long windowSize = WINDOW_SIZE.toMillis();
final long windowStart = (RECORD_TIME / windowSize) * windowSize; final long windowStart = (RECORD_TIME / windowSize) * windowSize;
// miss the window start
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart - 1),
Instant.ofEpochMilli(windowStart - 1),
extractor,
mkSet()
);
// do the query at the first window
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart),
extractor,
mkSet(1, 2)
);
// do the query at the first and the second windows
shouldHandleWindowRangeQuery( shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart), Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()),
extractor,
mkSet(1, 2, 3, 4)
);
// do the query at the second and the third windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()),
extractor,
mkSet(3, 4, 5, 13)
);
// do the query at the third and the fourth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
extractor,
mkSet(17, 5, 13)
);
// do the query at the fourth and the fifth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()),
extractor,
mkSet(17)
);
//do the query at the fifth and the sixth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(25).toMillis()),
extractor,
mkSet()
);
// do the query from the second to the fourth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
extractor,
mkSet(17, 3, 4, 5, 13)
);
// do the query from the first to the fourth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart), Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
extractor, extractor,
mkSet(0, 1, 2, 3) mkSet(1, 17, 2, 3, 4, 5, 13)
); );
// Should fail to execute this query on a WindowStore.
final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
inStore(STORE_NAME)
.withQuery(query)
.withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, T>>> queryResult =
result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final QueryResult<KeyValueIterator<Windowed<Integer>, T>> partitionResult =
queryResult.get(partition);
final boolean failure = partitionResult.isFailure();
if (!failure) {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(partitionResult.getFailureMessage(), matchesPattern(
"This store"
+ " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
+ " doesn't know how to execute the given query"
+ " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+ " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\."
+ " Contact the store maintainer if you need support for a new query type\\."
));
}
}
}
private <T> void shouldHandleWindowRangePAPIQueries(final Function<T, Integer> extractor) {
final long windowSize = WINDOW_SIZE.toMillis();
final long windowStart = (RECORD_TIME / windowSize) * windowSize;
// miss the window start // miss the window start
shouldHandleWindowRangeQuery( shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart - 1), Instant.ofEpochMilli(windowStart - 1),
@ -901,17 +1245,81 @@ public class IQv2StoreIntegrationTest {
mkSet() mkSet()
); );
// do the query at the first window
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart),
extractor,
mkSet(1, 2)
);
// do the query at the first and the second windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()),
extractor,
mkSet(1, 2, 3, 4)
);
// do the query at the second and the third windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()),
extractor,
mkSet(3, 4, 5, 7)
);
// do the query at the third and the fourth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(10).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
extractor,
mkSet(5, 7, 9)
);
// do the query at the fourth and the fifth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()),
extractor,
mkSet(9)
);
//do the query at the fifth and the sixth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(20).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(25).toMillis()),
extractor,
mkSet()
);
// do the query from the second to the fourth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(5).toMillis()),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
extractor,
mkSet(3, 4, 5, 7, 9)
);
// do the query from the first to the fourth windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart + Duration.ofMinutes(15).toMillis()),
extractor,
mkSet(1, 2, 3, 4, 5, 7, 9)
);
// Should fail to execute this query on a WindowStore. // Should fail to execute this query on a WindowStore.
final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2); final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request = final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
inStore(STORE_NAME) inStore(STORE_NAME)
.withQuery(query) .withQuery(query)
.withPartitions(mkSet(0, 1)) .withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION)); .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result = final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) { if (result.getGlobalResult() != null) {
fail("global tables aren't implemented"); fail("global tables aren't implemented");
@ -938,10 +1346,103 @@ public class IQv2StoreIntegrationTest {
} }
} }
private <T> void shouldHandleSessionKeyQueries() { private <T> void shouldHandleSessionKeyDSLQueries() {
shouldHandleSessionRangeQuery(
0,
mkSet(1)
);
shouldHandleSessionRangeQuery(
1,
mkSet(5)
);
shouldHandleSessionRangeQuery( shouldHandleSessionRangeQuery(
2, 2,
mkSet(2) mkSet(9)
);
shouldHandleSessionRangeQuery(
3,
mkSet(13)
);
shouldHandleSessionRangeQuery(
4,
mkSet(17)
);
// not preset, so empty result iter
shouldHandleSessionRangeQuery(
999,
mkSet()
);
// Should fail to execute this query on a SessionStore.
final WindowRangeQuery<Integer, T> query =
WindowRangeQuery.withWindowStartRange(
Instant.ofEpochMilli(0L),
Instant.ofEpochMilli(0L)
);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
inStore(STORE_NAME)
.withQuery(query)
.withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, T>>> queryResult =
result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final QueryResult<KeyValueIterator<Windowed<Integer>, T>> partitionResult =
queryResult.get(partition);
final boolean failure = partitionResult.isFailure();
if (!failure) {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(partitionResult.getFailureMessage(), is(
"This store"
+ " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+ " doesn't know how to execute the given query"
+ " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
+ " because SessionStores only support WindowRangeQuery.withKey."
+ " Contact the store maintainer if you need support for a new query type."
));
}
}
}
private <T> void shouldHandleSessionKeyPAPIQueries() {
shouldHandleSessionRangeQuery(
0,
mkSet(0, 1)
);
shouldHandleSessionRangeQuery(
1,
mkSet(2, 3)
);
shouldHandleSessionRangeQuery(
2,
mkSet(4, 5)
);
shouldHandleSessionRangeQuery(
3,
mkSet(6, 7)
);
shouldHandleSessionRangeQuery(
4,
mkSet(8, 9)
); );
// not preset, so empty result iter // not preset, so empty result iter
@ -1008,7 +1509,7 @@ public class IQv2StoreIntegrationTest {
assertThat( assertThat(
result.getGlobalResult().getFailureMessage(), result.getGlobalResult().getFailureMessage(),
is("Global stores do not yet support the KafkaStreams#query API." is("Global stores do not yet support the KafkaStreams#query API."
+ " Use KafkaStreams#store instead.") + " Use KafkaStreams#store instead.")
); );
} }
@ -1048,19 +1549,19 @@ public class IQv2StoreIntegrationTest {
} }
public <V> void shouldHandleKeyQuery( public <V> void shouldHandleKeyQuery(
final Integer key, final Integer key,
final Function<V, Integer> valueExtactor, final Function<V, Integer> valueExtactor,
final Integer expectedValue) { final Integer expectedValue) {
final KeyQuery<Integer, V> query = KeyQuery.withKey(key); final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
final StateQueryRequest<V> request = final StateQueryRequest<V> request =
inStore(STORE_NAME) inStore(STORE_NAME)
.withQuery(query) .withQuery(query)
.withPartitions(mkSet(0, 1)) .withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION)); .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<V> result = final StateQueryResult<V> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
final QueryResult<V> queryResult = result.getOnlyPartitionResult(); final QueryResult<V> queryResult = result.getOnlyPartitionResult();
final boolean failure = queryResult.isFailure(); final boolean failure = queryResult.isFailure();
@ -1071,8 +1572,8 @@ public class IQv2StoreIntegrationTest {
assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
assertThrows( assertThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
queryResult::getFailureMessage queryResult::getFailureMessage
); );
final V result1 = queryResult.getResult(); final V result1 = queryResult.getResult();
@ -1080,7 +1581,7 @@ public class IQv2StoreIntegrationTest {
assertThat(integer, is(expectedValue)); assertThat(integer, is(expectedValue));
assertThat(queryResult.getExecutionInfo(), is(empty())); assertThat(queryResult.getExecutionInfo(), is(empty()));
assertThat(queryResult.getPosition(), is(POSITION_0)); assertThat(queryResult.getPosition(), is(POSITION_0));
} }
public <V> void shouldHandleRangeQuery( public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower, final Optional<Integer> lower,
@ -1189,21 +1690,21 @@ public class IQv2StoreIntegrationTest {
} }
public <V> void shouldHandleWindowRangeQuery( public <V> void shouldHandleWindowRangeQuery(
final Instant timeFrom, final Instant timeFrom,
final Instant timeTo, final Instant timeTo,
final Function<V, Integer> valueExtactor, final Function<V, Integer> valueExtactor,
final Set<Integer> expectedValue) { final Set<Integer> expectedValue) {
final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowStartRange(timeFrom, timeTo); final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request = final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
inStore(STORE_NAME) inStore(STORE_NAME)
.withQuery(query) .withQuery(query)
.withPartitions(mkSet(0, 1)) .withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION)); .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result = final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) { if (result.getGlobalResult() != null) {
fail("global tables aren't implemented"); fail("global tables aren't implemented");
@ -1218,12 +1719,12 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.get(partition).isSuccess(), is(true)); assertThat(queryResult.get(partition).isSuccess(), is(true));
assertThrows( assertThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
queryResult.get(partition)::getFailureReason queryResult.get(partition)::getFailureReason
); );
assertThrows( assertThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
queryResult.get(partition)::getFailureMessage queryResult.get(partition)::getFailureMessage
); );
try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) { try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
@ -1239,18 +1740,18 @@ public class IQv2StoreIntegrationTest {
} }
public <V> void shouldHandleSessionRangeQuery( public <V> void shouldHandleSessionRangeQuery(
final Integer key, final Integer key,
final Set<Integer> expectedValue) { final Set<Integer> expectedValue) {
final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withKey(key); final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withKey(key);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request = final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
inStore(STORE_NAME) inStore(STORE_NAME)
.withQuery(query) .withQuery(query)
.withPartitions(mkSet(0, 1)) .withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION)); .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result = final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) { if (result.getGlobalResult() != null) {
fail("global tables aren't implemented"); fail("global tables aren't implemented");
@ -1265,12 +1766,12 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.get(partition).isSuccess(), is(true)); assertThat(queryResult.get(partition).isSuccess(), is(true));
assertThrows( assertThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
queryResult.get(partition)::getFailureReason queryResult.get(partition)::getFailureReason
); );
assertThrows( assertThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
queryResult.get(partition)::getFailureMessage queryResult.get(partition)::getFailureMessage
); );
try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) { try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {

Loading…
Cancel
Save