Browse Source

KAFKA-7652: Part I; Fix SessionStore's findSession(single-key) (#6134)

Let findSessions(final K key) to call on underlying bytes store directly, using the more restricted range.

Fix the conservative upper range for multi-key range in session schema.

Minor: removed unnecessary private WrappedSessionStoreBytesIterator class as it is only used in unit test.

Minor: removed unnecessary schema#init function by using the direct bytes-to-binary function.

Please read the original PR for more detailed explanation of the root cause of the bug.


Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>
pull/6172/head
Guozhang Wang 6 years ago committed by GitHub
parent
commit
56139df844
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
  2. 1
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
  3. 3
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
  4. 13
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
  5. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
  6. 19
      streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
  7. 5
      streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
  8. 31
      streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
  9. 7
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
  10. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
  11. 91
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
  12. 3
      streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
  13. 6
      streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java

1
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java

@ -71,7 +71,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
private void initInternal(final InternalProcessorContext context) { private void initInternal(final InternalProcessorContext context) {
this.context = context; this.context = context;
keySchema.init(topic);
serdes = new StateSerdes<>( serdes = new StateSerdes<>(
topic, topic,
keySerde == null ? (Serde<K>) context.keySerde() : keySerde, keySerde == null ? (Serde<K>) context.keySerde() : keySerde,

1
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

@ -69,7 +69,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
public void init(final ProcessorContext context, final StateStore root) { public void init(final ProcessorContext context, final StateStore root) {
initInternal(context); initInternal(context);
underlying.init(context, root); underlying.init(context, root);
keySchema.init(context.applicationId());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

3
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java

@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallba
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
@ -164,8 +163,6 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
"expired-window-record-drop" "expired-window-record-drop"
); );
keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()));
segments.openExisting(this.context); segments.openExisting(this.context);
bulkLoadSegments = new HashSet<>(segments.allSegments()); bulkLoadSegments = new HashSet<>(segments.allSegments());

13
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java

@ -61,14 +61,21 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState
@Override @Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return findSessions(key, key, earliestSessionEndTime, latestSessionStartTime); final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
Bytes.wrap(serdes.rawKey(key)),
earliestSessionEndTime,
latestSessionStartTime
);
return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
} }
@Override @Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch( final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)), Bytes.wrap(serdes.rawKey(keyFrom)),
earliestSessionEndTime, latestSessionStartTime Bytes.wrap(serdes.rawKey(keyTo)),
earliestSessionEndTime,
latestSessionStartTime
); );
return new WrappedSessionStoreIterator<>(bytesIterator, serdes); return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
} }

7
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java

@ -98,13 +98,6 @@ public interface SegmentedBytesStore extends StateStore {
interface KeySchema { interface KeySchema {
/**
* Initialized the schema with a topic.
*
* @param topic a topic name
*/
void init(final String topic);
/** /**
* Given a range of record keys and a time, construct a Segmented key that represents * Given a range of record keys and a time, construct a Segmented key that represents
* the upper range of keys to search when performing range queries. * the upper range of keys to search when performing range queries.

19
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java

@ -17,8 +17,6 @@
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Window;
@ -36,31 +34,24 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
private static final int SUFFIX_SIZE = 2 * TIMESTAMP_SIZE; private static final int SUFFIX_SIZE = 2 * TIMESTAMP_SIZE;
private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
private String topic;
private final Serde<Bytes> bytesSerdes = Serdes.Bytes();
@Override
public void init(final String topic) {
this.topic = topic;
}
@Override @Override
public Bytes upperRangeFixedSize(final Bytes key, final long to) { public Bytes upperRangeFixedSize(final Bytes key, final long to) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
return Bytes.wrap(SessionKeySchema.toBinary(sessionKey, bytesSerdes.serializer(), topic)); return Bytes.wrap(SessionKeySchema.toBinary(sessionKey));
} }
@Override @Override
public Bytes lowerRangeFixedSize(final Bytes key, final long from) { public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
return Bytes.wrap(SessionKeySchema.toBinary(sessionKey, bytesSerdes.serializer(), topic)); return Bytes.wrap(SessionKeySchema.toBinary(sessionKey));
} }
@Override @Override
public Bytes upperRange(final Bytes key, final long to) { public Bytes upperRange(final Bytes key, final long to) {
final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE) final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)
.putLong(to) // the end timestamp can be as large as possible as long as it's larger than start time
// start can at most be equal to end .putLong(Long.MAX_VALUE)
// this is the start timestamp
.putLong(to) .putLong(to)
.array(); .array();
return OrderedBytes.upperRange(key, maxSuffix); return OrderedBytes.upperRange(key, maxSuffix);

5
streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java

@ -35,11 +35,6 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
@Override
public void init(final String topic) {
// nothing to do
}
@Override @Override
public Bytes upperRange(final Bytes key, final long to) { public Bytes upperRange(final Bytes key, final long to) {
final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE) final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)

31
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java

@ -23,35 +23,12 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.StateSerdes;
class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, V> { class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
final KeyValueIterator<Bytes, byte[]> bytesIterator;
private final StateSerdes<K, V> serdes;
// this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
private static class WrappedSessionStoreBytesIterator extends WrappedSessionStoreIterator<Bytes, byte[]> {
WrappedSessionStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
final StateSerdes<Bytes, byte[]> serdes) {
super(underlying, serdes);
}
@Override private final KeyValueIterator<Bytes, byte[]> bytesIterator;
public Windowed<Bytes> peekNextKey() { private final StateSerdes<K, V> serdes;
final Bytes key = bytesIterator.peekNextKey();
return SessionKeySchema.from(key);
}
@Override
public KeyValue<Windowed<Bytes>, byte[]> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
return KeyValue.pair(SessionKeySchema.from(next.key), next.value);
}
}
static WrappedSessionStoreIterator<Bytes, byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
final StateSerdes<Bytes, byte[]> serdes) {
return new WrappedSessionStoreBytesIterator(underlying, serdes);
}
WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<K, V> serdes) { WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator,
final StateSerdes<K, V> serdes) {
this.bytesIterator = bytesIterator; this.bytesIterator = bytesIterator;
this.serdes = serdes; this.serdes = serdes;
} }

7
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java

@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.After; import org.junit.After;
@ -45,7 +44,6 @@ import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList; import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
import static org.apache.kafka.streams.state.internals.WrappedSessionStoreIterator.bytesIterator;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
@ -68,7 +66,6 @@ public class CachingSessionStoreTest {
@Before @Before
public void setUp() { public void setUp() {
final SessionKeySchema schema = new SessionKeySchema(); final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema);
final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
@ -151,11 +148,9 @@ public class CachingSessionStoreTest {
@Test @Test
public void shouldFlushItemsToStoreOnEviction() { public void shouldFlushItemsToStoreOnEviction() {
final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("topic", Serdes.Bytes(), Serdes.ByteArray());
final List<KeyValue<Windowed<Bytes>, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d"); final List<KeyValue<Windowed<Bytes>, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d");
assertEquals(added.size() - 1, cache.size()); assertEquals(added.size() - 1, cache.size());
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions(added.get(0).key.key(), 0, 0);
bytesIterator(underlying.fetch(added.get(0).key.key(), 0, 0), serdes);
final KeyValue<Windowed<Bytes>, byte[]> next = iterator.next(); final KeyValue<Windowed<Bytes>, byte[]> next = iterator.next();
assertEquals(added.get(0).key, next.key); assertEquals(added.get(0).key, next.key);
assertArrayEquals(added.get(0).value, next.value); assertArrayEquals(added.get(0).value, next.value);

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java

@ -92,7 +92,6 @@ public class RocksDBSegmentedBytesStoreTest {
@Before @Before
public void before() { public void before() {
schema.init("topic");
if (schema instanceof SessionKeySchema) { if (schema instanceof SessionKeySchema) {
windows[0] = new SessionWindow(10L, 10L); windows[0] = new SessionWindow(10L, 10L);

91
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java

@ -34,6 +34,7 @@ import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -50,7 +51,6 @@ public class RocksDBSessionStoreTest {
@Before @Before
public void before() { public void before() {
final SessionKeySchema schema = new SessionKeySchema(); final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore( final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(
"session-store", "session-store",
@ -94,11 +94,21 @@ public class RocksDBSessionStoreTest {
final List<KeyValue<Windowed<String>, Long>> expected = final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
final KeyValueIterator<Windowed<String>, Long> values = try (final KeyValueIterator<Windowed<String>, Long> values =
sessionStore.findSessions(key, 0, 1000L); sessionStore.findSessions(key, 0, 1000L)
) {
assertEquals(expected, toList(values)); assertEquals(expected, toList(values));
} }
final List<KeyValue<Windowed<String>, Long>> expected2 = Collections.singletonList(KeyValue.pair(a2, 2L));
try (final KeyValueIterator<Windowed<String>, Long> values2 =
sessionStore.findSessions(key, 400L, 600L)
) {
assertEquals(expected2, toList(values2));
}
}
@Test @Test
public void shouldFetchAllSessionsWithSameRecordKey() { public void shouldFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
@ -114,8 +124,9 @@ public class RocksDBSessionStoreTest {
// add one that shouldn't appear in the results // add one that shouldn't appear in the results
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);
final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a")); try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(expected, results); assertEquals(expected, toList(values));
}
} }
@Test @Test
@ -123,15 +134,16 @@ public class RocksDBSessionStoreTest {
final String key = "a"; final String key = "a";
sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L);
sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L);
final KeyValueIterator<Windowed<String>, Long> results =
sessionStore.findSessions(key, -1, 1000L);
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L),
KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L));
try (final KeyValueIterator<Windowed<String>, Long> results =
sessionStore.findSessions(key, -1, 1000L)) {
assertEquals(expected, toList(results)); assertEquals(expected, toList(results));
} }
}
@Test @Test
public void shouldRemove() { public void shouldRemove() {
@ -139,9 +151,16 @@ public class RocksDBSessionStoreTest {
sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L);
sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000))); sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000)));
assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext());
assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext()); try (final KeyValueIterator<Windowed<String>, Long> results =
sessionStore.findSessions("a", 0L, 1000L)) {
assertFalse(results.hasNext());
}
try (final KeyValueIterator<Windowed<String>, Long> results =
sessionStore.findSessions("a", 1500L, 2500L)) {
assertTrue(results.hasNext());
}
} }
@Test @Test
@ -156,12 +175,15 @@ public class RocksDBSessionStoreTest {
sessionStore.put(session3, 3L); sessionStore.put(session3, 3L);
sessionStore.put(session4, 4L); sessionStore.put(session4, 4L);
sessionStore.put(session5, 5L); sessionStore.put(session5, 5L);
final KeyValueIterator<Windowed<String>, Long> results =
sessionStore.findSessions("a", 150, 300); try (final KeyValueIterator<Windowed<String>, Long> results =
sessionStore.findSessions("a", 150, 300)
) {
assertEquals(session2, results.next().key); assertEquals(session2, results.next().key);
assertEquals(session3, results.next().key); assertEquals(session3, results.next().key);
assertFalse(results.hasNext()); assertFalse(results.hasNext());
} }
}
@Test @Test
public void shouldFetchExactKeys() { public void shouldFetchExactKeys() {
@ -180,37 +202,34 @@ public class RocksDBSessionStoreTest {
sessionStore.init(context, sessionStore); sessionStore.init(context, sessionStore);
sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 2L); sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L);
KeyValueIterator<Windowed<String>, Long> iterator = try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", 0, Long.MAX_VALUE); sessionStore.findSessions("a", 0, Long.MAX_VALUE)
List<Long> results = new ArrayList<>(); ) {
while (iterator.hasNext()) { assertThat(valuesToList(iterator), equalTo(Arrays.asList(1L, 3L, 5L)));
results.add(iterator.next().value);
} }
assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L))); try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("aa", 0, Long.MAX_VALUE)
) {
iterator = sessionStore.findSessions("aa", 0, Long.MAX_VALUE); assertThat(valuesToList(iterator), equalTo(Arrays.asList(2L, 4L)));
results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(iterator.next().value);
} }
assertThat(results, equalTo(Arrays.asList(2L, 4L))); try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE)
) {
assertThat(valuesToList(iterator), equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L)));
}
final KeyValueIterator<Windowed<String>, Long> rangeIterator = try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE); sessionStore.findSessions("a", "aa", 10, 0)
final List<Long> rangeResults = new ArrayList<>(); ) {
while (rangeIterator.hasNext()) { assertThat(valuesToList(iterator), equalTo(Collections.singletonList(2L)));
rangeResults.add(rangeIterator.next().value);
} }
assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L)));
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
@ -261,4 +280,12 @@ public class RocksDBSessionStoreTest {
return results; return results;
} }
static <K, V> List<V> valuesToList(final KeyValueIterator<Windowed<K>, V> iterator) {
final List<V> results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(iterator.next().value);
}
return results;
}
} }

3
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java

@ -56,7 +56,6 @@ public class SessionKeySchemaTest {
@Before @Before
public void before() { public void before() {
sessionKeySchema.init("topic");
final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1), final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2), KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3), KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3),
@ -144,7 +143,7 @@ public class SessionKeySchemaTest {
final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))) new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE))))
)); ));
} }

6
streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java

@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.KeyValueIteratorStub; import org.apache.kafka.test.KeyValueIteratorStub;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -55,11 +54,6 @@ public class WindowKeySchemaTest {
final private Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde); final private Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde);
final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray()); final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray());
@Before
public void before() {
windowKeySchema.init("topic");
}
@Test @Test
public void testHasNextConditionUsingNullKeys() { public void testHasNextConditionUsingNullKeys() {
final List<KeyValue<Bytes, Integer>> keys = Arrays.asList( final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(

Loading…
Cancel
Save