diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 13f4a6eb409..09e1cb46cf1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Merger; @@ -185,16 +184,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce @Override public Agg get(final Windowed key) { - try (final KeyValueIterator, Agg> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) { - if (!iter.hasNext()) { - return null; - } - final Agg value = iter.next().value; - if (iter.hasNext()) { - throw new ProcessorStateException(String.format("Iterator for key [%s] on session store has more than one value", key)); - } - return value; - } + return store.fetchSession(key.key(), key.window().start(), key.window().end()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 9ecc73c14e4..4409a95cc24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -386,6 +386,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return getInner().fetchSession(key, startTime, endTime); + } + @Override public KeyValueIterator, AGG> fetch(final K key) { return getInner().fetch(key); @@ -564,6 +569,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re wrapped().put(sessionKey, aggregate); } + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return wrapped().fetchSession(key, startTime, endTime); + } + @Override public KeyValueIterator, AGG> fetch(final K key) { return wrapped().fetch(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index ce88b9f8d28..230d2576178 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.Windowed; * @param the aggregated value type */ public interface ReadOnlySessionStore { - /** * Retrieve all aggregated sessions for the provided key. * This iterator must be closed after use. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index c1a699317eb..c8b180591c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -55,6 +55,17 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime); + /** + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param startTime start timestamp of the session + * @param endTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException If {@code null} is used for any key. + */ + AGG fetchSession(K key, long startTime, long endTime); + /** * Remove the session aggregated with provided {@link Windowed} key from the store * @param sessionKey key of the session to remove diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ccf824c9cd1..a07f1ce43b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -29,10 +28,8 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; -import java.util.List; import java.util.Objects; - class CachingSessionStore extends WrappedStateStore.AbstractStateStore implements SessionStore, CachedStateStore, AGG> { private final SessionStore bytesStore; @@ -78,12 +75,9 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i cacheName = context.taskId() + "-" + bytesStore.name(); cache = context.getCache(); - cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List entries) { - for (final ThreadCache.DirtyEntry entry : entries) { - putAndMaybeForward(entry, context); - } + cache.addDirtyEntryFlushListener(cacheName, entries -> { + for (final ThreadCache.DirtyEntry entry : entries) { + putAndMaybeForward(entry, context); } }); } @@ -138,7 +132,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i @Override public void put(final Windowed key, final byte[] value) { validateStoreOpen(); - final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key)); + final Bytes binaryKey = SessionKeySchema.toBinary(key); final LRUCacheEntry entry = new LRUCacheEntry( value, @@ -151,6 +145,24 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); } + @Override + public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + if (cache == null) { + return bytesStore.fetchSession(key, startTime, endTime); + } else { + final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); + final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); + final LRUCacheEntry entry = cache.get(cacheName, cacheKey); + if (entry == null) { + return bytesStore.fetchSession(key, startTime, endTime); + } else { + return entry.value(); + } + } + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); @@ -173,7 +185,9 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key())); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); - final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null; + final AGG oldValue = newValue == null || sendOldValues ? + serdes.valueFrom(bytesStore.fetchSession(rawKey, key.window().start(), key.window().end())) : + null; if (!(newValue == null && oldValue == null)) { flushListener.apply( key, @@ -188,15 +202,6 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i } } - private AGG fetchPrevious(final Bytes rawKey, final Window window) { - try (final KeyValueIterator, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) { - if (!iterator.hasNext()) { - return null; - } - return serdes.valueFrom(iterator.next().value); - } - } - public void flush() { cache.flush(cacheName); bytesStore.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 57401aece0a..3ddbedefe9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -66,16 +66,21 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStor @Override public void remove(final Windowed sessionKey) { bytesStore.remove(sessionKey); - changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), null); + changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null); } @Override public void put(final Windowed sessionKey, final byte[] aggregate) { bytesStore.put(sessionKey, aggregate); - changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), aggregate); + changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate); } + @Override + public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + return bytesStore.fetchSession(key, startTime, endTime); + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes key) { return findSessions(key, 0, Long.MAX_VALUE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index e4da4240710..63d551c1d08 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -42,7 +42,6 @@ public class CompositeReadOnlySessionStore implements ReadOnlySessionStore this.storeName = storeName; } - @Override public KeyValueIterator, V> fetch(final K key) { Objects.requireNonNull(key, "key can't be null"); @@ -58,7 +57,8 @@ public class CompositeReadOnlySessionStore implements ReadOnlySessionStore } catch (final InvalidStateStoreException ise) { throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + " and may have been migrated to another instance; " + - "please re-discover its location from the state metadata."); + "please re-discover its location from the state metadata. " + + "Original error message: " + ise.toString()); } } return KeyValueIterators.emptyIterator(); @@ -68,12 +68,7 @@ public class CompositeReadOnlySessionStore implements ReadOnlySessionStore public KeyValueIterator, V> fetch(final K from, final K to) { Objects.requireNonNull(from, "from can't be null"); Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = new NextIteratorFunction, V, ReadOnlySessionStore>() { - @Override - public KeyValueIterator, V> apply(final ReadOnlySessionStore store) { - return store.fetch(from, to); - } - }; + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(from, to); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>( storeProvider.stores(storeName, queryableStoreType).iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index dd2b1932993..69942225310 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -63,7 +63,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto @Override public int compare(final Bytes cacheKey, final Windowed storeKey) { - final Bytes storeKeyBytes = Bytes.wrap(SessionKeySchema.toBinary(storeKey)); + final Bytes storeKeyBytes = SessionKeySchema.toBinary(storeKey); return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 31a039b644f..3bb7fca5d55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -178,6 +178,21 @@ public class MeteredSessionStore extends WrappedStateStore.AbstractStateSt return Bytes.wrap(serdes.rawKey(key)); } + @Override + public V fetchSession(final K key, final long startTime, final long endTime) { + Objects.requireNonNull(key, "key cannot be null"); + final V value; + final Bytes bytesKey = keyBytes(key); + final long startNs = time.nanoseconds(); + try { + value = serdes.valueFrom(inner.fetchSession(bytesKey, startTime, endTime)); + } finally { + metrics.recordLatency(flushTime, startNs, time.nanoseconds()); + } + + return value; + } + @Override public KeyValueIterator, V> fetch(final K key) { Objects.requireNonNull(key, "key cannot be null"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 4e947375cb1..be423bc783f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -80,6 +80,11 @@ public class RocksDBSessionStore extends WrappedStateStore.AbstractState return new WrappedSessionStoreIterator<>(bytesIterator, serdes); } + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return serdes.valueFrom(bytesStore.get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime))); + } + @Override public KeyValueIterator, AGG> fetch(final K key) { return findSessions(key, 0, Long.MAX_VALUE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 6e998601266..94f08aecc18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -36,13 +36,13 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @Override public Bytes upperRangeFixedSize(final Bytes key, final long to) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); - return Bytes.wrap(SessionKeySchema.toBinary(sessionKey)); + return SessionKeySchema.toBinary(sessionKey); } @Override public Bytes lowerRangeFixedSize(final Bytes key, final long from) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); - return Bytes.wrap(SessionKeySchema.toBinary(sessionKey)); + return SessionKeySchema.toBinary(sessionKey); } @Override @@ -136,19 +136,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { final Serializer serializer, final String topic) { final byte[] bytes = serializer.serialize(topic, sessionKey.key()); - final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); - buf.put(bytes); - buf.putLong(sessionKey.window().end()); - buf.putLong(sessionKey.window().start()); - return buf.array(); + return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), sessionKey.window().end()).get(); + } + + public static Bytes toBinary(final Windowed sessionKey) { + return toBinary(sessionKey.key(), sessionKey.window().start(), sessionKey.window().end()); } - public static byte[] toBinary(final Windowed sessionKey) { - final byte[] bytes = sessionKey.key().get(); + public static Bytes toBinary(final Bytes key, + final long startTime, + final long endTime) { + final byte[] bytes = key.get(); final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); buf.put(bytes); - buf.putLong(sessionKey.window().end()); - buf.putLong(sessionKey.window().start()); - return buf.array(); + buf.putLong(endTime); + buf.putLong(startTime); + return Bytes.wrap(buf.array()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index f57b94a4fb2..36426d4bd2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -37,6 +37,7 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -224,31 +225,51 @@ public class CachingSessionStoreTest { @Test public void shouldForwardChangedValuesDuringFlush() { - final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); - final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed a = new Windowed<>(keyA, new SessionWindow(2, 4)); + final Windowed b = new Windowed<>(keyA, new SessionWindow(1, 2)); + final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(2, 4)); + final Windowed bDeserialized = new Windowed<>("a", new SessionWindow(1, 2)); final List, Change>> flushed = new ArrayList<>(); cachingStore.setFlushListener( (key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), true ); + cachingStore.put(b, "1".getBytes()); + cachingStore.flush(); + + assertEquals( + Collections.singletonList(KeyValue.pair(bDeserialized, new Change<>("1", null))), + flushed + ); + flushed.clear(); + cachingStore.put(a, "1".getBytes()); cachingStore.flush(); + assertEquals( + Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("1", null))), + flushed + ); + flushed.clear(); + cachingStore.put(a, "2".getBytes()); cachingStore.flush(); + assertEquals( + Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("2", "1"))), + flushed + ); + flushed.clear(); + cachingStore.remove(a); cachingStore.flush(); assertEquals( - flushed, - Arrays.asList( - KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", "1")), - KeyValue.pair(aDeserialized, new Change<>(null, "2")) - ) + Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>(null, "2"))), + flushed ); + flushed.clear(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index e8b1e5520b6..6c1ab193b61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -93,7 +93,7 @@ public class ChangeLoggingSessionBytesStoreTest { store.put(key1, value1); - assertArrayEquals(value1, (byte[]) sent.get(Bytes.wrap(SessionKeySchema.toBinary(key1)))); + assertArrayEquals(value1, (byte[]) sent.get(SessionKeySchema.toBinary(key1))); EasyMock.verify(inner); } @@ -105,7 +105,7 @@ public class ChangeLoggingSessionBytesStoreTest { init(); store.remove(key1); - final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key1)); + final Bytes binaryKey = SessionKeySchema.toBinary(key1); assertTrue(sent.containsKey(binaryKey)); assertNull(sent.get(binaryKey)); EasyMock.verify(inner); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index 2008d935933..617ff362b69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -49,9 +49,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { private final SessionWindow cacheWindow = new SessionWindow(10, 20); private final Iterator> cacheKvs = Collections.singleton( KeyValue.pair( - SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(Bytes.wrap( - SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow)) - )), + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow))), new LRUCacheEntry(cacheKey.get()) )).iterator(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index e79bc57fb10..cd34fb0accf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -56,12 +56,12 @@ public class SessionKeySchemaTest { @Before public void before() { - final List> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20)))), 4), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20)))), 5), - KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20)))), 6)); + final List> keys = Arrays.asList(KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5), + KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6)); iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator())); } @@ -93,29 +93,26 @@ public class SessionKeySchemaTest { assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) - ) + upper.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) )) >= 0 ); assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA, 0xB}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) - ) + upper.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, 0xB}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + )) >= 0 ); - assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( + assertThat(upper, equalTo(SessionKeySchema.toBinary( new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) - )); + ); } @Test @@ -124,33 +121,32 @@ public class SessionKeySchemaTest { assertThat( "shorter key with max timestamp should be in range", - upper.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA, (byte) 0x8F}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) + upper.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, (byte) 0x8F}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) ) - )) >= 0 + ) >= 0 ); - assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( + assertThat(upper, equalTo(SessionKeySchema.toBinary( new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)))) - )); + ); } @Test public void testUpperBoundWithZeroTimestamp() { final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); - assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary( + assertThat(upper, equalTo(SessionKeySchema.toBinary( new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE)))) - )); + ); } @Test public void testLowerBoundWithZeroTimestamp() { final Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0); - assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))))); + assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); } @Test @@ -159,16 +155,14 @@ public class SessionKeySchemaTest { assertThat( "appending zeros to key should still be in range", - lower.compareTo(Bytes.wrap( - SessionKeySchema.toBinary( - new Windowed<>( - Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), - new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) - ) + lower.compareTo(SessionKeySchema.toBinary( + new Windowed<>( + Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), + new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE)) )) < 0 ); - assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))))); + assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))); } @Test @@ -234,7 +228,7 @@ public class SessionKeySchemaTest { public void shouldExtractBytesKeyFromBinary() { final Bytes bytesKey = Bytes.wrap(key.getBytes()); final Windowed windowedBytesKey = new Windowed<>(bytesKey, window); - final Bytes serialized = Bytes.wrap(SessionKeySchema.toBinary(windowedBytesKey)); + final Bytes serialized = SessionKeySchema.toBinary(windowedBytesKey); assertEquals(windowedBytesKey, SessionKeySchema.from(serialized)); } diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 3fa23d2f33d..39ff614751e 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -37,7 +37,7 @@ public class ReadOnlySessionStoreStub implements ReadOnlySessionStore sessionKey, final V value) { if (!sessions.containsKey(sessionKey.key())) { - sessions.put(sessionKey.key(), new ArrayList, V>>()); + sessions.put(sessionKey.key(), new ArrayList<>()); } sessions.get(sessionKey.key()).add(KeyValue.pair(sessionKey, value)); }