diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index cbcb7490efb..24dec9cca37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -71,7 +71,6 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i private void initInternal(final InternalProcessorContext context) { this.context = context; - keySchema.init(topic); serdes = new StateSerdes<>( topic, keySerde == null ? (Serde) context.keySerde() : keySerde, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index a934e811504..ca41441d04c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -69,7 +69,6 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl public void init(final ProcessorContext context, final StateStore root) { initInternal(context); underlying.init(context, root); - keySchema.init(context.applicationId()); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index fa714651f34..17079b964b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallba import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.rocksdb.RocksDBException; @@ -164,8 +163,6 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { "expired-window-record-drop" ); - keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name())); - segments.openExisting(this.context); bulkLoadSegments = new HashSet<>(segments.allSegments()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 5d9c0459302..4e947375cb1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -61,14 +61,21 @@ public class RocksDBSessionStore extends WrappedStateStore.AbstractState @Override public KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - return findSessions(key, key, earliestSessionEndTime, latestSessionStartTime); + final KeyValueIterator bytesIterator = bytesStore.fetch( + Bytes.wrap(serdes.rawKey(key)), + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator<>(bytesIterator, serdes); } @Override public KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { final KeyValueIterator bytesIterator = bytesStore.fetch( - Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)), - earliestSessionEndTime, latestSessionStartTime + Bytes.wrap(serdes.rawKey(keyFrom)), + Bytes.wrap(serdes.rawKey(keyTo)), + earliestSessionEndTime, + latestSessionStartTime ); return new WrappedSessionStoreIterator<>(bytesIterator, serdes); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index a53318528e6..ce528ed2f12 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -98,13 +98,6 @@ public interface SegmentedBytesStore extends StateStore { interface KeySchema { - /** - * Initialized the schema with a topic. - * - * @param topic a topic name - */ - void init(final String topic); - /** * Given a range of record keys and a time, construct a Segmented key that represents * the upper range of keys to search when performing range queries. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index debef9a58c2..8ba78cc57d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -17,8 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Window; @@ -36,31 +34,24 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { private static final int SUFFIX_SIZE = 2 * TIMESTAMP_SIZE; private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; - private String topic; - private final Serde bytesSerdes = Serdes.Bytes(); - - @Override - public void init(final String topic) { - this.topic = topic; - } - @Override public Bytes upperRangeFixedSize(final Bytes key, final long to) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); - return Bytes.wrap(SessionKeySchema.toBinary(sessionKey, bytesSerdes.serializer(), topic)); + return Bytes.wrap(SessionKeySchema.toBinary(sessionKey)); } @Override public Bytes lowerRangeFixedSize(final Bytes key, final long from) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); - return Bytes.wrap(SessionKeySchema.toBinary(sessionKey, bytesSerdes.serializer(), topic)); + return Bytes.wrap(SessionKeySchema.toBinary(sessionKey)); } @Override public Bytes upperRange(final Bytes key, final long to) { final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE) - .putLong(to) - // start can at most be equal to end + // the end timestamp can be as large as possible as long as it's larger than start time + .putLong(Long.MAX_VALUE) + // this is the start timestamp .putLong(to) .array(); return OrderedBytes.upperRange(key, maxSuffix); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index e4d99582fd7..0b3ba9e7d1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -35,11 +35,6 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; - @Override - public void init(final String topic) { - // nothing to do - } - @Override public Bytes upperRange(final Bytes key, final long to) { final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java index 47496a4b766..ce274570c06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java @@ -23,35 +23,12 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; class WrappedSessionStoreIterator implements KeyValueIterator, V> { - final KeyValueIterator bytesIterator; - private final StateSerdes serdes; - - // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs - private static class WrappedSessionStoreBytesIterator extends WrappedSessionStoreIterator { - WrappedSessionStoreBytesIterator(final KeyValueIterator underlying, - final StateSerdes serdes) { - super(underlying, serdes); - } - @Override - public Windowed peekNextKey() { - final Bytes key = bytesIterator.peekNextKey(); - return SessionKeySchema.from(key); - } - - @Override - public KeyValue, byte[]> next() { - final KeyValue next = bytesIterator.next(); - return KeyValue.pair(SessionKeySchema.from(next.key), next.value); - } - } - - static WrappedSessionStoreIterator bytesIterator(final KeyValueIterator underlying, - final StateSerdes serdes) { - return new WrappedSessionStoreBytesIterator(underlying, serdes); - } + private final KeyValueIterator bytesIterator; + private final StateSerdes serdes; - WrappedSessionStoreIterator(final KeyValueIterator bytesIterator, final StateSerdes serdes) { + WrappedSessionStoreIterator(final KeyValueIterator bytesIterator, + final StateSerdes serdes) { this.bytesIterator = bytesIterator; this.serdes = serdes; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index fabda420793..657cef67f77 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -45,7 +44,6 @@ import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList; -import static org.apache.kafka.streams.state.internals.WrappedSessionStoreIterator.bytesIterator; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.junit.Assert.assertArrayEquals; @@ -68,7 +66,6 @@ public class CachingSessionStoreTest { @Before public void setUp() { final SessionKeySchema schema = new SessionKeySchema(); - schema.init("topic"); underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); final RocksDBSessionStore sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL); @@ -151,11 +148,9 @@ public class CachingSessionStoreTest { @Test public void shouldFlushItemsToStoreOnEviction() { - final StateSerdes serdes = new StateSerdes<>("topic", Serdes.Bytes(), Serdes.ByteArray()); final List, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d"); assertEquals(added.size() - 1, cache.size()); - final KeyValueIterator, byte[]> iterator = - bytesIterator(underlying.fetch(added.get(0).key.key(), 0, 0), serdes); + final KeyValueIterator, byte[]> iterator = cachingStore.findSessions(added.get(0).key.key(), 0, 0); final KeyValue, byte[]> next = iterator.next(); assertEquals(added.get(0).key, next.key); assertArrayEquals(added.get(0).value, next.value); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index d03f17bb066..0b9d66df67b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -92,7 +92,6 @@ public class RocksDBSegmentedBytesStoreTest { @Before public void before() { - schema.init("topic"); if (schema instanceof SessionKeySchema) { windows[0] = new SessionWindow(10L, 10L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 3ebeee531e4..f13ac0a2aff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; @@ -50,7 +51,6 @@ public class RocksDBSessionStoreTest { @Before public void before() { final SessionKeySchema schema = new SessionKeySchema(); - schema.init("topic"); final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore( "session-store", @@ -94,9 +94,19 @@ public class RocksDBSessionStoreTest { final List, Long>> expected = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); - final KeyValueIterator, Long> values = - sessionStore.findSessions(key, 0, 1000L); - assertEquals(expected, toList(values)); + try (final KeyValueIterator, Long> values = + sessionStore.findSessions(key, 0, 1000L) + ) { + assertEquals(expected, toList(values)); + } + + final List, Long>> expected2 = Collections.singletonList(KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator, Long> values2 = + sessionStore.findSessions(key, 400L, 600L) + ) { + assertEquals(expected2, toList(values2)); + } } @Test @@ -114,8 +124,9 @@ public class RocksDBSessionStoreTest { // add one that shouldn't appear in the results sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); - final List, Long>> results = toList(sessionStore.fetch("a")); - assertEquals(expected, results); + try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { + assertEquals(expected, toList(values)); + } } @Test @@ -123,14 +134,15 @@ public class RocksDBSessionStoreTest { final String key = "a"; sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); - final KeyValueIterator, Long> results = - sessionStore.findSessions(key, -1, 1000L); final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); + KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); - assertEquals(expected, toList(results)); + try (final KeyValueIterator, Long> results = + sessionStore.findSessions(key, -1, 1000L)) { + assertEquals(expected, toList(results)); + } } @Test @@ -139,9 +151,16 @@ public class RocksDBSessionStoreTest { sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000))); - assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext()); - assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext()); + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 0L, 1000L)) { + assertFalse(results.hasNext()); + } + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 1500L, 2500L)) { + assertTrue(results.hasNext()); + } } @Test @@ -156,11 +175,14 @@ public class RocksDBSessionStoreTest { sessionStore.put(session3, 3L); sessionStore.put(session4, 4L); sessionStore.put(session5, 5L); - final KeyValueIterator, Long> results = - sessionStore.findSessions("a", 150, 300); - assertEquals(session2, results.next().key); - assertEquals(session3, results.next().key); - assertFalse(results.hasNext()); + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 150, 300) + ) { + assertEquals(session2, results.next().key); + assertEquals(session3, results.next().key); + assertFalse(results.hasNext()); + } } @Test @@ -180,37 +202,34 @@ public class RocksDBSessionStoreTest { sessionStore.init(context, sessionStore); sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); - sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 2L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); - KeyValueIterator, Long> iterator = - sessionStore.findSessions("a", 0, Long.MAX_VALUE); - List results = new ArrayList<>(); - while (iterator.hasNext()) { - results.add(iterator.next().value); + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(1L, 3L, 5L))); } - assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L))); - - - iterator = sessionStore.findSessions("aa", 0, Long.MAX_VALUE); - results = new ArrayList<>(); - while (iterator.hasNext()) { - results.add(iterator.next().value); + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(2L, 4L))); } - assertThat(results, equalTo(Arrays.asList(2L, 4L))); - + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L))); + } - final KeyValueIterator, Long> rangeIterator = - sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE); - final List rangeResults = new ArrayList<>(); - while (rangeIterator.hasNext()) { - rangeResults.add(rangeIterator.next().value); + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", "aa", 10, 0) + ) { + assertThat(valuesToList(iterator), equalTo(Collections.singletonList(2L))); } - assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L))); } @Test(expected = NullPointerException.class) @@ -261,4 +280,12 @@ public class RocksDBSessionStoreTest { return results; } + static List valuesToList(final KeyValueIterator, V> iterator) { + final List results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(iterator.next().value); + } + return results; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index 6b51687e0bd..e79bc57fb10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -56,7 +56,6 @@ public class SessionKeySchemaTest { @Before public void before() { - sessionKeySchema.init("topic"); final List> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1), KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2), KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3), @@ -144,7 +143,7 @@ public class SessionKeySchemaTest { final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( - new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))) + new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE)))) )); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java index 8eaf0d0d34b..81120809a0d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.KeyValueIteratorStub; -import org.junit.Before; import org.junit.Test; import static org.hamcrest.MatcherAssert.assertThat; @@ -55,11 +54,6 @@ public class WindowKeySchemaTest { final private Serde> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde); final private StateSerdes stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray()); - @Before - public void before() { - windowKeySchema.init("topic"); - } - @Test public void testHasNextConditionUsingNullKeys() { final List> keys = Arrays.asList(