From 9d5893d3d5c548ffbc11b8aa880f5a280892e47a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 30 Jan 2019 17:31:31 -0800 Subject: [PATCH] KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (#6161) #2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart. I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only). --- .../KStreamSessionWindowAggregate.java | 12 +--- .../internals/ProcessorContextImpl.java | 10 +++ .../streams/state/ReadOnlySessionStore.java | 1 - .../kafka/streams/state/SessionStore.java | 11 +++ .../state/internals/CachingSessionStore.java | 45 ++++++------ .../ChangeLoggingSessionBytesStore.java | 9 ++- .../CompositeReadOnlySessionStore.java | 11 +-- ...MergedSortedCacheSessionStoreIterator.java | 2 +- .../state/internals/MeteredSessionStore.java | 15 ++++ .../state/internals/RocksDBSessionStore.java | 5 ++ .../state/internals/SessionKeySchema.java | 26 +++---- .../internals/CachingSessionStoreTest.java | 37 +++++++--- .../ChangeLoggingSessionBytesStoreTest.java | 4 +- ...dCacheWrappedSessionStoreIteratorTest.java | 4 +- .../state/internals/SessionKeySchemaTest.java | 72 +++++++++---------- .../kafka/test/ReadOnlySessionStoreStub.java | 2 +- 16 files changed, 158 insertions(+), 108 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 13f4a6eb409..09e1cb46cf1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Merger; @@ -185,16 +184,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce @Override public Agg get(final Windowed key) { - try (final KeyValueIterator, Agg> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) { - if (!iter.hasNext()) { - return null; - } - final Agg value = iter.next().value; - if (iter.hasNext()) { - throw new ProcessorStateException(String.format("Iterator for key [%s] on session store has more than one value", key)); - } - return value; - } + return store.fetchSession(key.key(), key.window().start(), key.window().end()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 9ecc73c14e4..4409a95cc24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -386,6 +386,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return getInner().fetchSession(key, startTime, endTime); + } + @Override public KeyValueIterator, AGG> fetch(final K key) { return getInner().fetch(key); @@ -564,6 +569,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re wrapped().put(sessionKey, aggregate); } + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return wrapped().fetchSession(key, startTime, endTime); + } + @Override public KeyValueIterator, AGG> fetch(final K key) { return wrapped().fetch(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index ce88b9f8d28..230d2576178 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.Windowed; * @param the aggregated value type */ public interface ReadOnlySessionStore { - /** * Retrieve all aggregated sessions for the provided key. * This iterator must be closed after use. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index c1a699317eb..c8b180591c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -55,6 +55,17 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime); + /** + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param startTime start timestamp of the session + * @param endTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException If {@code null} is used for any key. + */ + AGG fetchSession(K key, long startTime, long endTime); + /** * Remove the session aggregated with provided {@link Windowed} key from the store * @param sessionKey key of the session to remove diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ccf824c9cd1..a07f1ce43b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -29,10 +28,8 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; -import java.util.List; import java.util.Objects; - class CachingSessionStore extends WrappedStateStore.AbstractStateStore implements SessionStore, CachedStateStore, AGG> { private final SessionStore bytesStore; @@ -78,12 +75,9 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i cacheName = context.taskId() + "-" + bytesStore.name(); cache = context.getCache(); - cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List entries) { - for (final ThreadCache.DirtyEntry entry : entries) { - putAndMaybeForward(entry, context); - } + cache.addDirtyEntryFlushListener(cacheName, entries -> { + for (final ThreadCache.DirtyEntry entry : entries) { + putAndMaybeForward(entry, context); } }); } @@ -138,7 +132,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i @Override public void put(final Windowed key, final byte[] value) { validateStoreOpen(); - final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key)); + final Bytes binaryKey = SessionKeySchema.toBinary(key); final LRUCacheEntry entry = new LRUCacheEntry( value, @@ -151,6 +145,24 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); } + @Override + public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + if (cache == null) { + return bytesStore.fetchSession(key, startTime, endTime); + } else { + final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); + final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); + final LRUCacheEntry entry = cache.get(cacheName, cacheKey); + if (entry == null) { + return bytesStore.fetchSession(key, startTime, endTime); + } else { + return entry.value(); + } + } + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); @@ -173,7 +185,9 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key())); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); - final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null; + final AGG oldValue = newValue == null || sendOldValues ? + serdes.valueFrom(bytesStore.fetchSession(rawKey, key.window().start(), key.window().end())) : + null; if (!(newValue == null && oldValue == null)) { flushListener.apply( key, @@ -188,15 +202,6 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i } } - private AGG fetchPrevious(final Bytes rawKey, final Window window) { - try (final KeyValueIterator, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) { - if (!iterator.hasNext()) { - return null; - } - return serdes.valueFrom(iterator.next().value); - } - } - public void flush() { cache.flush(cacheName); bytesStore.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 57401aece0a..3ddbedefe9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -66,16 +66,21 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStor @Override public void remove(final Windowed sessionKey) { bytesStore.remove(sessionKey); - changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), null); + changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null); } @Override public void put(final Windowed sessionKey, final byte[] aggregate) { bytesStore.put(sessionKey, aggregate); - changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), aggregate); + changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate); } + @Override + public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + return bytesStore.fetchSession(key, startTime, endTime); + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes key) { return findSessions(key, 0, Long.MAX_VALUE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index e4da4240710..63d551c1d08 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -42,7 +42,6 @@ public class CompositeReadOnlySessionStore implements ReadOnlySessionStore this.storeName = storeName; } - @Override public KeyValueIterator, V> fetch(final K key) { Objects.requireNonNull(key, "key can't be null"); @@ -58,7 +57,8 @@ public class CompositeReadOnlySessionStore implements ReadOnlySessionStore } catch (final InvalidStateStoreException ise) { throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + " and may have been migrated to another instance; " + - "please re-discover its location from the state metadata."); + "please re-discover its location from the state metadata. " + + "Original error message: " + ise.toString()); } } return KeyValueIterators.emptyIterator(); @@ -68,12 +68,7 @@ public class CompositeReadOnlySessionStore implements ReadOnlySessionStore public KeyValueIterator, V> fetch(final K from, final K to) { Objects.requireNonNull(from, "from can't be null"); Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = new NextIteratorFunction, V, ReadOnlySessionStore>() { - @Override - public KeyValueIterator, V> apply(final ReadOnlySessionStore store) { - return store.fetch(from, to); - } - }; + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(from, to); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>( storeProvider.stores(storeName, queryableStoreType).iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index dd2b1932993..69942225310 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -63,7 +63,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto @Override public int compare(final Bytes cacheKey, final Windowed storeKey) { - final Bytes storeKeyBytes = Bytes.wrap(SessionKeySchema.toBinary(storeKey)); + final Bytes storeKeyBytes = SessionKeySchema.toBinary(storeKey); return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 31a039b644f..3bb7fca5d55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -178,6 +178,21 @@ public class MeteredSessionStore extends WrappedStateStore.AbstractStateSt return Bytes.wrap(serdes.rawKey(key)); } + @Override + public V fetchSession(final K key, final long startTime, final long endTime) { + Objects.requireNonNull(key, "key cannot be null"); + final V value; + final Bytes bytesKey = keyBytes(key); + final long startNs = time.nanoseconds(); + try { + value = serdes.valueFrom(inner.fetchSession(bytesKey, startTime, endTime)); + } finally { + metrics.recordLatency(flushTime, startNs, time.nanoseconds()); + } + + return value; + } + @Override public KeyValueIterator, V> fetch(final K key) { Objects.requireNonNull(key, "key cannot be null"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 4e947375cb1..be423bc783f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -80,6 +80,11 @@ public class RocksDBSessionStore extends WrappedStateStore.AbstractState return new WrappedSessionStoreIterator<>(bytesIterator, serdes); } + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return serdes.valueFrom(bytesStore.get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime))); + } + @Override public KeyValueIterator, AGG> fetch(final K key) { return findSessions(key, 0, Long.MAX_VALUE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 6e998601266..94f08aecc18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -36,13 +36,13 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @Override public Bytes upperRangeFixedSize(final Bytes key, final long to) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); - return Bytes.wrap(SessionKeySchema.toBinary(sessionKey)); + return SessionKeySchema.toBinary(sessionKey); } @Override public Bytes lowerRangeFixedSize(final Bytes key, final long from) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); - return Bytes.wrap(SessionKeySchema.toBinary(sessionKey)); + return SessionKeySchema.toBinary(sessionKey); } @Override @@ -136,19 +136,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { final Serializer serializer, final String topic) { final byte[] bytes = serializer.serialize(topic, sessionKey.key()); - final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); - buf.put(bytes); - buf.putLong(sessionKey.window().end()); - buf.putLong(sessionKey.window().start()); - return buf.array(); + return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), sessionKey.window().end()).get(); + } + + public static Bytes toBinary(final Windowed sessionKey) { + return toBinary(sessionKey.key(), sessionKey.window().start(), sessionKey.window().end()); } - public static byte[] toBinary(final Windowed sessionKey) { - final byte[] bytes = sessionKey.key().get(); + public static Bytes toBinary(final Bytes key, + final long startTime, + final long endTime) { + final byte[] bytes = key.get(); final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); buf.put(bytes); - buf.putLong(sessionKey.window().end()); - buf.putLong(sessionKey.window().start()); - return buf.array(); + buf.putLong(endTime); + buf.putLong(startTime); + return Bytes.wrap(buf.array()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index f57b94a4fb2..36426d4bd2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -37,6 +37,7 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -224,31 +225,51 @@ public class CachingSessionStoreTest { @Test public void shouldForwardChangedValuesDuringFlush() { - final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); - final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed a = new Windowed<>(keyA, new SessionWindow(2, 4)); + final Windowed b = new Windowed<>(keyA, new SessionWindow(1, 2)); + final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(2, 4)); + final Windowed bDeserialized = new Windowed<>("a", new SessionWindow(1, 2)); final List, Change>> flushed = new ArrayList<>(); cachingStore.setFlushListener( (key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), true ); + cachingStore.put(b, "1".getBytes()); + cachingStore.flush(); + + assertEquals( + Collections.singletonList(KeyValue.pair(bDeserialized, new Change<>("1", null))), + flushed + ); + flushed.clear(); + cachingStore.put(a, "1".getBytes()); cachingStore.flush(); + assertEquals( + Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("1", null))), + flushed + ); + flushed.clear(); + cachingStore.put(a, "2".getBytes()); cachingStore.flush(); + assertEquals( + Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("2", "1"))), + flushed + ); + flushed.clear(); + cachingStore.remove(a); cachingStore.flush(); assertEquals( - flushed, - Arrays.asList( - KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", "1")), - KeyValue.pair(aDeserialized, new Change<>(null, "2")) - ) + Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>(null, "2"))), + flushed ); + flushed.clear(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index e8b1e5520b6..6c1ab193b61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -93,7 +93,7 @@ public class ChangeLoggingSessionBytesStoreTest { store.put(key1, value1); - assertArrayEquals(value1, (byte[]) sent.get(Bytes.wrap(SessionKeySchema.toBinary(key1)))); + assertArrayEquals(value1, (byte[]) sent.get(SessionKeySchema.toBinary(key1))); EasyMock.verify(inner); } @@ -105,7 +105,7 @@ public class ChangeLoggingSessionBytesStoreTest { init(); store.remove(key1); - final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key1)); + final Bytes binaryKey = SessionKeySchema.toBinary(key1); assertTrue(sent.containsKey(binaryKey)); assertNull(sent.get(binaryKey)); EasyMock.verify(inner); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index 2008d935933..617ff362b69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -49,9 +49,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { private final SessionWindow cacheWindow = new SessionWindow(10, 20); private final Iterator> cacheKvs = Collections.singleton( KeyValue.pair( - SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(Bytes.wrap( - SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow)) - )), + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow))), new LRUCacheEntry(cacheKey.get()) )).iterator(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index e79bc57fb10..cd34fb0accf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -56,12 +56,12 @@ public class SessionKeySchemaTest { @Before public void before() { - final List> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20)))), 4), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20)))), 5), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20)))), 6)); + final List> keys = Arrays.asList(KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6)); iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator())); } @@ -93,29 +93,26 @@ public class SessionKeySchemaTest { assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) - ) + upper.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) )) >= 0 ); assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA, 0xB}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) - ) + upper.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, 0xB}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + )) >= 0 ); - assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( + assertThat(upper, equalTo(SessionKeySchema.toBinary( new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) - )); + ); } @Test @@ -124,33 +121,32 @@ public class SessionKeySchemaTest { assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA, (byte) 0x8F}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + upper.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, (byte) 0x8F}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) ) - )) >= 0 + ) >= 0 ); - assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( + assertThat(upper, equalTo(SessionKeySchema.toBinary( new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) - )); + ); } @Test public void testUpperBoundWithZeroTimestamp() { final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); - assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( + assertThat(upper, equalTo(SessionKeySchema.toBinary( new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE)))) - )); + ); } @Test public void testLowerBoundWithZeroTimestamp() { final Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); - assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))))); + assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); } @Test @@ -159,16 +155,14 @@ public class SessionKeySchemaTest { assertThat( "appending zeros to key should still be in range", - lower.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) - ) + lower.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) )) < 0 ); - assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))))); + assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); } @Test @@ -234,7 +228,7 @@ public class SessionKeySchemaTest { public void shouldExtractBytesKeyFromBinary() { final Bytes bytesKey = Bytes.wrap(key.getBytes()); final Windowed windowedBytesKey = new Windowed<>(bytesKey, window); - final Bytes serialized = Bytes.wrap(SessionKeySchema.toBinary(windowedBytesKey)); + final Bytes serialized = SessionKeySchema.toBinary(windowedBytesKey); assertEquals(windowedBytesKey, SessionKeySchema.from(serialized)); } diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 3fa23d2f33d..39ff614751e 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -37,7 +37,7 @@ public class ReadOnlySessionStoreStub implements ReadOnlySessionStore sessionKey, final V value) { if (!sessions.containsKey(sessionKey.key())) { - sessions.put(sessionKey.key(), new ArrayList, V>>()); + sessions.put(sessionKey.key(), new ArrayList<>()); } sessions.get(sessionKey.key()).add(KeyValue.pair(sessionKey, value)); }