Browse Source

KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 (#14477)

Implements KIP-985.

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/14502/merge
Hanyu Zheng 1 year ago committed by GitHub
parent
commit
bbdf6de88a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
  2. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
  3. 9
      streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
  4. 103
      streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java

28
streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java

@ -40,10 +40,12 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { @@ -40,10 +40,12 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
private final Optional<K> lower;
private final Optional<K> upper;
private final boolean isKeyAscending;
private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
private RangeQuery(final Optional<K> lower, final Optional<K> upper, final boolean isKeyAscending) {
this.lower = lower;
this.upper = upper;
this.isKeyAscending = isKeyAscending;
}
/**
@ -54,7 +56,23 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { @@ -54,7 +56,23 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param <V> The value type
*/
public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper));
return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true);
}
/**
* Determines if the query keys are in ascending order.
* @return true if ascending, false otherwise.
*/
public boolean isKeyAscending() {
return isKeyAscending;
}
/**
* Set the query to return keys in descending order.
* @return a new RangeQuery instance with descending flag set.
*/
public RangeQuery<K, V> withDescendingKeys() {
return new RangeQuery<>(this.lower, this.upper, false);
}
/**
@ -65,7 +83,7 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { @@ -65,7 +83,7 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param <V> The value type
*/
public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) {
return new RangeQuery<>(Optional.empty(), Optional.of(upper));
return new RangeQuery<>(Optional.empty(), Optional.of(upper), true);
}
/**
@ -75,7 +93,7 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { @@ -75,7 +93,7 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param <V> The value type
*/
public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) {
return new RangeQuery<>(Optional.of(lower), Optional.empty());
return new RangeQuery<>(Optional.of(lower), Optional.empty(), true);
}
/**
@ -84,7 +102,7 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { @@ -84,7 +102,7 @@ public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param <V> The value type
*/
public static <K, V> RangeQuery<K, V> withNoBounds() {
return new RangeQuery<>(Optional.empty(), Optional.empty());
return new RangeQuery<>(Optional.empty(), Optional.empty(), true);
}
/**

7
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

@ -253,7 +253,8 @@ public class MeteredKeyValueStore<K, V> @@ -253,7 +253,8 @@ public class MeteredKeyValueStore<K, V>
final QueryResult<R> result;
final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
final RangeQuery<Bytes, byte[]> rawRangeQuery;
RangeQuery<Bytes, byte[]> rawRangeQuery;
final boolean isKeyAscending = typedQuery.isKeyAscending();
if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) {
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.getLowerBound().get()),
@ -266,6 +267,9 @@ public class MeteredKeyValueStore<K, V> @@ -266,6 +267,9 @@ public class MeteredKeyValueStore<K, V>
} else {
rawRangeQuery = RangeQuery.withNoBounds();
}
if (!isKeyAscending) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
@ -288,7 +292,6 @@ public class MeteredKeyValueStore<K, V> @@ -288,7 +292,6 @@ public class MeteredKeyValueStore<K, V>
return result;
}
@SuppressWarnings("unchecked")
protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
final PositionBound positionBound,

9
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java

@ -186,12 +186,17 @@ public final class StoreQueryUtils { @@ -186,12 +186,17 @@ public final class StoreQueryUtils {
final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes, byte[]>) query;
final Optional<Bytes> lowerRange = rangeQuery.getLowerBound();
final Optional<Bytes> upperRange = rangeQuery.getUpperBound();
final boolean isKeyAscending = rangeQuery.isKeyAscending();
final KeyValueIterator<Bytes, byte[]> iterator;
try {
if (!lowerRange.isPresent() && !upperRange.isPresent()) {
if (!lowerRange.isPresent() && !upperRange.isPresent() && isKeyAscending) {
iterator = kvStore.all();
} else {
} else if (isKeyAscending) {
iterator = kvStore.range(lowerRange.orElse(null), upperRange.orElse(null));
} else if (!lowerRange.isPresent() && !upperRange.isPresent()) {
iterator = kvStore.reverseAll();
} else {
iterator = kvStore.reverseRange(lowerRange.orElse(null), upperRange.orElse(null));
}
final R result = (R) iterator;
return QueryResult.forResult(result);

103
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java

@ -96,6 +96,7 @@ import java.util.Optional; @@ -96,6 +96,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -783,22 +784,12 @@ public class IQv2StoreIntegrationTest { @@ -783,22 +784,12 @@ public class IQv2StoreIntegrationTest {
if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value;
if (kind.equals("DSL")) {
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangeDSLQueries(valueExtractor);
} else {
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangePAPIQueries(valueExtractor);
}
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangeQueries(valueExtractor);
} else {
final Function<Integer, Integer> valueExtractor = Function.identity();
if (kind.equals("DSL")) {
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangeDSLQueries(valueExtractor);
} else {
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangePAPIQueries(valueExtractor);
}
shouldHandleKeyQuery(2, valueExtractor, 5);
shouldHandleRangeQueries(valueExtractor);
}
}
@ -842,77 +833,93 @@ public class IQv2StoreIntegrationTest { @@ -842,77 +833,93 @@ public class IQv2StoreIntegrationTest {
}
private <T> void shouldHandleRangeDSLQueries(final Function<T, Integer> extractor) {
private <T> void shouldHandleRangeQueries(final Function<T, Integer> extractor) {
shouldHandleRangeQuery(
Optional.of(0),
Optional.of(4),
true,
extractor,
mkSet(1, 3, 5, 7, 9)
Arrays.asList(1, 5, 9, 3, 7)
);
shouldHandleRangeQuery(
Optional.of(1),
Optional.of(3),
true,
extractor,
mkSet(3, 5, 7)
Arrays.asList(5, 3, 7)
);
shouldHandleRangeQuery(
Optional.of(3),
Optional.empty(),
true,
extractor,
mkSet(7, 9)
Arrays.asList(9, 7)
);
shouldHandleRangeQuery(
Optional.empty(),
Optional.of(3),
true,
extractor,
mkSet(1, 3, 5, 7)
Arrays.asList(1, 5, 3, 7)
);
shouldHandleRangeQuery(
Optional.empty(),
Optional.empty(),
true,
extractor,
mkSet(1, 3, 5, 7, 9)
Arrays.asList(1, 5, 9, 3, 7)
);
shouldHandleRangeQuery(
Optional.of(1),
Optional.of(3),
false,
extractor,
Arrays.asList(5, 7, 3)
);
}
private <T> void shouldHandleRangePAPIQueries(final Function<T, Integer> extractor) {
shouldHandleRangeQuery(
Optional.of(0),
Optional.of(4),
false,
extractor,
mkSet(1, 3, 5, 7, 9)
Arrays.asList(9, 5, 1, 7, 3)
);
shouldHandleRangeQuery(
Optional.of(1),
Optional.of(3),
false,
extractor,
mkSet(3, 5, 7)
Arrays.asList(5, 7, 3)
);
shouldHandleRangeQuery(
Optional.of(3),
Optional.empty(),
false,
extractor,
mkSet(7, 9)
Arrays.asList(9, 7)
);
shouldHandleRangeQuery(
Optional.empty(),
Optional.of(3),
false,
extractor,
mkSet(1, 3, 5, 7)
Arrays.asList(5, 1, 7, 3)
);
shouldHandleRangeQuery(
Optional.empty(),
Optional.empty(),
false,
extractor,
mkSet(1, 3, 5, 7, 9)
Arrays.asList(9, 5, 1, 7, 3)
);
}
@ -1586,12 +1593,15 @@ public class IQv2StoreIntegrationTest { @@ -1586,12 +1593,15 @@ public class IQv2StoreIntegrationTest {
public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower,
final Optional<Integer> upper,
final boolean isKeyAscending,
final Function<V, Integer> valueExtactor,
final Set<Integer> expectedValue) {
final List<Integer> expectedValues) {
final RangeQuery<Integer, V> query;
RangeQuery<Integer, V> query;
query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
if (!isKeyAscending) {
query = query.withDescendingKeys();
}
final StateQueryRequest<KeyValueIterator<Integer, V>> request =
inStore(STORE_NAME)
@ -1604,9 +1614,10 @@ public class IQv2StoreIntegrationTest { @@ -1604,9 +1614,10 @@ public class IQv2StoreIntegrationTest {
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Set<Integer> actualValue = new HashSet<>();
final List<Integer> actualValues = new ArrayList<>();
final Map<Integer, QueryResult<KeyValueIterator<Integer, V>>> queryResult = result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final TreeSet<Integer> partitions = new TreeSet<>(queryResult.keySet());
for (final int partition : partitions) {
final boolean failure = queryResult.get(partition).isFailure();
if (failure) {
throw new AssertionError(queryResult.toString());
@ -1624,12 +1635,12 @@ public class IQv2StoreIntegrationTest { @@ -1624,12 +1635,12 @@ public class IQv2StoreIntegrationTest {
try (final KeyValueIterator<Integer, V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValue.add(valueExtactor.apply(iterator.next().value));
actualValues.add(valueExtactor.apply(iterator.next().value));
}
}
assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
}
assertThat("Result:" + result, actualValue, is(expectedValue));
assertThat("Result:" + result, actualValues, is(expectedValues));
assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION));
}
}
@ -1639,7 +1650,7 @@ public class IQv2StoreIntegrationTest { @@ -1639,7 +1650,7 @@ public class IQv2StoreIntegrationTest {
final Instant timeFrom,
final Instant timeTo,
final Function<V, Integer> valueExtactor,
final Set<Integer> expectedValue) {
final Set<Integer> expectedValues) {
final WindowKeyQuery<Integer, V> query = WindowKeyQuery.withKeyAndWindowStartRange(
key,
@ -1659,7 +1670,7 @@ public class IQv2StoreIntegrationTest { @@ -1659,7 +1670,7 @@ public class IQv2StoreIntegrationTest {
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Set<Integer> actualValue = new HashSet<>();
final Set<Integer> actualValues = new HashSet<>();
final Map<Integer, QueryResult<WindowStoreIterator<V>>> queryResult = result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final boolean failure = queryResult.get(partition).isFailure();
@ -1679,12 +1690,12 @@ public class IQv2StoreIntegrationTest { @@ -1679,12 +1690,12 @@ public class IQv2StoreIntegrationTest {
try (final WindowStoreIterator<V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValue.add(valueExtactor.apply(iterator.next().value));
actualValues.add(valueExtactor.apply(iterator.next().value));
}
}
assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
}
assertThat("Result:" + result, actualValue, is(expectedValue));
assertThat("Result:" + result, actualValues, is(expectedValues));
assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION));
}
}
@ -1693,7 +1704,7 @@ public class IQv2StoreIntegrationTest { @@ -1693,7 +1704,7 @@ public class IQv2StoreIntegrationTest {
final Instant timeFrom,
final Instant timeTo,
final Function<V, Integer> valueExtactor,
final Set<Integer> expectedValue) {
final Set<Integer> expectedValues) {
final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
@ -1709,7 +1720,7 @@ public class IQv2StoreIntegrationTest { @@ -1709,7 +1720,7 @@ public class IQv2StoreIntegrationTest {
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Set<Integer> actualValue = new HashSet<>();
final Set<Integer> actualValues = new HashSet<>();
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final boolean failure = queryResult.get(partition).isFailure();
@ -1729,19 +1740,19 @@ public class IQv2StoreIntegrationTest { @@ -1729,19 +1740,19 @@ public class IQv2StoreIntegrationTest {
try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValue.add(valueExtactor.apply(iterator.next().value));
actualValues.add(valueExtactor.apply(iterator.next().value));
}
}
assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
}
assertThat("Result:" + result, actualValue, is(expectedValue));
assertThat("Result:" + result, actualValues, is(expectedValues));
assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION));
}
}
public <V> void shouldHandleSessionRangeQuery(
final Integer key,
final Set<Integer> expectedValue) {
final Set<Integer> expectedValues) {
final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withKey(key);
@ -1756,7 +1767,7 @@ public class IQv2StoreIntegrationTest { @@ -1756,7 +1767,7 @@ public class IQv2StoreIntegrationTest {
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Set<Integer> actualValue = new HashSet<>();
final Set<Integer> actualValues = new HashSet<>();
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final boolean failure = queryResult.get(partition).isFailure();
@ -1776,12 +1787,12 @@ public class IQv2StoreIntegrationTest { @@ -1776,12 +1787,12 @@ public class IQv2StoreIntegrationTest {
try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValue.add((Integer) iterator.next().value);
actualValues.add((Integer) iterator.next().value);
}
}
assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
}
assertThat("Result:" + result, actualValue, is(expectedValue));
assertThat("Result:" + result, actualValues, is(expectedValues));
assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION));
}
}

Loading…
Cancel
Save