Browse Source

KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (#6161)

#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.

I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
pull/6135/merge
Guozhang Wang 6 years ago committed by GitHub
parent
commit
9d5893d3d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
  2. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
  3. 1
      streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
  4. 11
      streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
  5. 45
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
  6. 9
      streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
  7. 11
      streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
  8. 2
      streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
  9. 15
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
  10. 5
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
  11. 26
      streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
  12. 37
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
  13. 4
      streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
  14. 4
      streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
  15. 72
      streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
  16. 2
      streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java

12
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java

@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals; @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
@ -185,16 +184,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce @@ -185,16 +184,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
@Override
public Agg get(final Windowed<K> key) {
try (final KeyValueIterator<Windowed<K>, Agg> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) {
if (!iter.hasNext()) {
return null;
}
final Agg value = iter.next().value;
if (iter.hasNext()) {
throw new ProcessorStateException(String.format("Iterator for key [%s] on session store has more than one value", key));
}
return value;
}
return store.fetchSession(key.key(), key.window().start(), key.window().end());
}
@Override

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

@ -386,6 +386,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -386,6 +386,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public AGG fetchSession(final K key, final long startTime, final long endTime) {
return getInner().fetchSession(key, startTime, endTime);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return getInner().fetch(key);
@ -564,6 +569,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @@ -564,6 +569,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
wrapped().put(sessionKey, aggregate);
}
@Override
public AGG fetchSession(final K key, final long startTime, final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return wrapped().fetch(key);

1
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java

@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.Windowed; @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.Windowed;
* @param <AGG> the aggregated value type
*/
public interface ReadOnlySessionStore<K, AGG> {
/**
* Retrieve all aggregated sessions for the provided key.
* This iterator must be closed after use.

11
streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java

@ -55,6 +55,17 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K @@ -55,6 +55,17 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime);
/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param startTime start timestamp of the session
* @param endTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException If {@code null} is used for any key.
*/
AGG fetchSession(K key, long startTime, long endTime);
/**
* Remove the session aggregated with provided {@link Windowed} key from the store
* @param sessionKey key of the session to remove

45
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java

@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@ -29,10 +28,8 @@ import org.apache.kafka.streams.state.KeyValueIterator; @@ -29,10 +28,8 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import java.util.List;
import java.util.Objects;
class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, AGG> {
private final SessionStore<Bytes, byte[]> bytesStore;
@ -78,12 +75,9 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -78,12 +75,9 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
cacheName = context.taskId() + "-" + bytesStore.name();
cache = context.getCache();
cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> entries) {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, context);
}
cache.addDirtyEntryFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, context);
}
});
}
@ -138,7 +132,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -138,7 +132,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
@Override
public void put(final Windowed<Bytes> key, final byte[] value) {
validateStoreOpen();
final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key));
final Bytes binaryKey = SessionKeySchema.toBinary(key);
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
@ -151,6 +145,24 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -151,6 +145,24 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
}
@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
if (cache == null) {
return bytesStore.fetchSession(key, startTime, endTime);
} else {
final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime);
final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
final LRUCacheEntry entry = cache.get(cacheName, cacheKey);
if (entry == null) {
return bytesStore.fetchSession(key, startTime, endTime);
} else {
return entry.value();
}
}
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
@ -173,7 +185,9 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -173,7 +185,9 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null;
final AGG oldValue = newValue == null || sendOldValues ?
serdes.valueFrom(bytesStore.fetchSession(rawKey, key.window().start(), key.window().end())) :
null;
if (!(newValue == null && oldValue == null)) {
flushListener.apply(
key,
@ -188,15 +202,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -188,15 +202,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
}
}
private AGG fetchPrevious(final Bytes rawKey, final Window window) {
try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) {
if (!iterator.hasNext()) {
return null;
}
return serdes.valueFrom(iterator.next().value);
}
}
public void flush() {
cache.flush(cacheName);
bytesStore.flush();

9
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java

@ -66,16 +66,21 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStor @@ -66,16 +66,21 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStor
@Override
public void remove(final Windowed<Bytes> sessionKey) {
bytesStore.remove(sessionKey);
changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), null);
changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null);
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
bytesStore.put(sessionKey, aggregate);
changeLogger.logChange(Bytes.wrap(SessionKeySchema.toBinary(sessionKey)), aggregate);
changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate);
}
@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
return bytesStore.fetchSession(key, startTime, endTime);
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
return findSessions(key, 0, Long.MAX_VALUE);

11
streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java

@ -42,7 +42,6 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore @@ -42,7 +42,6 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
this.storeName = storeName;
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
Objects.requireNonNull(key, "key can't be null");
@ -58,7 +57,8 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore @@ -58,7 +57,8 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
} catch (final InvalidStateStoreException ise) {
throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" +
" and may have been migrated to another instance; " +
"please re-discover its location from the state metadata.");
"please re-discover its location from the state metadata. " +
"Original error message: " + ise.toString());
}
}
return KeyValueIterators.emptyIterator();
@ -68,12 +68,7 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore @@ -68,12 +68,7 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
Objects.requireNonNull(from, "from can't be null");
Objects.requireNonNull(to, "to can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>>() {
@Override
public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlySessionStore<K, V> store) {
return store.fetch(from, to);
}
};
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to);
return new DelegatingPeekingKeyValueIterator<>(storeName,
new CompositeKeyValueIterator<>(
storeProvider.stores(storeName, queryableStoreType).iterator(),

2
streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java

@ -63,7 +63,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto @@ -63,7 +63,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto
@Override
public int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
final Bytes storeKeyBytes = Bytes.wrap(SessionKeySchema.toBinary(storeKey));
final Bytes storeKeyBytes = SessionKeySchema.toBinary(storeKey);
return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
}
}

15
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java

@ -178,6 +178,21 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt @@ -178,6 +178,21 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
return Bytes.wrap(serdes.rawKey(key));
}
@Override
public V fetchSession(final K key, final long startTime, final long endTime) {
Objects.requireNonNull(key, "key cannot be null");
final V value;
final Bytes bytesKey = keyBytes(key);
final long startNs = time.nanoseconds();
try {
value = serdes.valueFrom(inner.fetchSession(bytesKey, startTime, endTime));
} finally {
metrics.recordLatency(flushTime, startNs, time.nanoseconds());
}
return value;
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");

5
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java

@ -80,6 +80,11 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState @@ -80,6 +80,11 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState
return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
}
@Override
public AGG fetchSession(final K key, final long startTime, final long endTime) {
return serdes.valueFrom(bytesStore.get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime)));
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return findSessions(key, 0, Long.MAX_VALUE);

26
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java

@ -36,13 +36,13 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @@ -36,13 +36,13 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
@Override
public Bytes upperRangeFixedSize(final Bytes key, final long to) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
return Bytes.wrap(SessionKeySchema.toBinary(sessionKey));
return SessionKeySchema.toBinary(sessionKey);
}
@Override
public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
return Bytes.wrap(SessionKeySchema.toBinary(sessionKey));
return SessionKeySchema.toBinary(sessionKey);
}
@Override
@ -136,19 +136,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @@ -136,19 +136,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
final Serializer<K> serializer,
final String topic) {
final byte[] bytes = serializer.serialize(topic, sessionKey.key());
final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(sessionKey.window().end());
buf.putLong(sessionKey.window().start());
return buf.array();
return toBinary(Bytes.wrap(bytes), sessionKey.window().start(), sessionKey.window().end()).get();
}
public static Bytes toBinary(final Windowed<Bytes> sessionKey) {
return toBinary(sessionKey.key(), sessionKey.window().start(), sessionKey.window().end());
}
public static byte[] toBinary(final Windowed<Bytes> sessionKey) {
final byte[] bytes = sessionKey.key().get();
public static Bytes toBinary(final Bytes key,
final long startTime,
final long endTime) {
final byte[] bytes = key.get();
final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(sessionKey.window().end());
buf.putLong(sessionKey.window().start());
return buf.array();
buf.putLong(endTime);
buf.putLong(startTime);
return Bytes.wrap(buf.array());
}
}

37
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java

@ -37,6 +37,7 @@ import org.junit.Test; @@ -37,6 +37,7 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@ -224,31 +225,51 @@ public class CachingSessionStoreTest { @@ -224,31 +225,51 @@ public class CachingSessionStoreTest {
@Test
public void shouldForwardChangedValuesDuringFlush() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(2, 4));
final Windowed<Bytes> b = new Windowed<>(keyA, new SessionWindow(1, 2));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(2, 4));
final Windowed<String> bDeserialized = new Windowed<>("a", new SessionWindow(1, 2));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(
(key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
true
);
cachingStore.put(b, "1".getBytes());
cachingStore.flush();
assertEquals(
Collections.singletonList(KeyValue.pair(bDeserialized, new Change<>("1", null))),
flushed
);
flushed.clear();
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
assertEquals(
Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("1", null))),
flushed
);
flushed.clear();
cachingStore.put(a, "2".getBytes());
cachingStore.flush();
assertEquals(
Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("2", "1"))),
flushed
);
flushed.clear();
cachingStore.remove(a);
cachingStore.flush();
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", "1")),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))
)
Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>(null, "2"))),
flushed
);
flushed.clear();
}
@Test

4
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java

@ -93,7 +93,7 @@ public class ChangeLoggingSessionBytesStoreTest { @@ -93,7 +93,7 @@ public class ChangeLoggingSessionBytesStoreTest {
store.put(key1, value1);
assertArrayEquals(value1, (byte[]) sent.get(Bytes.wrap(SessionKeySchema.toBinary(key1))));
assertArrayEquals(value1, (byte[]) sent.get(SessionKeySchema.toBinary(key1)));
EasyMock.verify(inner);
}
@ -105,7 +105,7 @@ public class ChangeLoggingSessionBytesStoreTest { @@ -105,7 +105,7 @@ public class ChangeLoggingSessionBytesStoreTest {
init();
store.remove(key1);
final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key1));
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
assertTrue(sent.containsKey(binaryKey));
assertNull(sent.get(binaryKey));
EasyMock.verify(inner);

4
streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java

@ -49,9 +49,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { @@ -49,9 +49,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
private final SessionWindow cacheWindow = new SessionWindow(10, 20);
private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
KeyValue.pair(
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(Bytes.wrap(
SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow))
)),
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(SessionKeySchema.toBinary(new Windowed<>(cacheKey, cacheWindow))),
new LRUCacheEntry(cacheKey.get())
)).iterator();

72
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java

@ -56,12 +56,12 @@ public class SessionKeySchemaTest { @@ -56,12 +56,12 @@ public class SessionKeySchemaTest {
@Before
public void before() {
final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0)))), 1),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0)))), 2),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0)))), 3),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20)))), 4),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20)))), 5),
KeyValue.pair(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20)))), 6));
final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
KeyValue.pair(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
}
@ -93,29 +93,26 @@ public class SessionKeySchemaTest { @@ -93,29 +93,26 @@ public class SessionKeySchemaTest {
assertThat(
"shorter key with max timestamp should be in range",
upper.compareTo(Bytes.wrap(
SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)
upper.compareTo(SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)) >= 0
);
assertThat(
"shorter key with max timestamp should be in range",
upper.compareTo(Bytes.wrap(
SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA, 0xB}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)
upper.compareTo(SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA, 0xB}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)) >= 0
);
assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
assertThat(upper, equalTo(SessionKeySchema.toBinary(
new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))))
));
);
}
@Test
@ -124,33 +121,32 @@ public class SessionKeySchemaTest { @@ -124,33 +121,32 @@ public class SessionKeySchemaTest {
assertThat(
"shorter key with max timestamp should be in range",
upper.compareTo(Bytes.wrap(
SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA, (byte) 0x8F}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
upper.compareTo(SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA, (byte) 0x8F}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)
)) >= 0
) >= 0
);
assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
assertThat(upper, equalTo(SessionKeySchema.toBinary(
new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))))
));
);
}
@Test
public void testUpperBoundWithZeroTimestamp() {
final Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
assertThat(upper, equalTo(SessionKeySchema.toBinary(
new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE))))
));
);
}
@Test
public void testLowerBoundWithZeroTimestamp() {
final Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))));
assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))));
}
@Test
@ -159,16 +155,14 @@ public class SessionKeySchemaTest { @@ -159,16 +155,14 @@ public class SessionKeySchemaTest {
assertThat(
"appending zeros to key should still be in range",
lower.compareTo(Bytes.wrap(
SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)
lower.compareTo(SessionKeySchema.toBinary(
new Windowed<>(
Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}),
new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
)) < 0
);
assertThat(lower, equalTo(Bytes.wrap(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))));
assertThat(lower, equalTo(SessionKeySchema.toBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))));
}
@Test
@ -234,7 +228,7 @@ public class SessionKeySchemaTest { @@ -234,7 +228,7 @@ public class SessionKeySchemaTest {
public void shouldExtractBytesKeyFromBinary() {
final Bytes bytesKey = Bytes.wrap(key.getBytes());
final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window);
final Bytes serialized = Bytes.wrap(SessionKeySchema.toBinary(windowedBytesKey));
final Bytes serialized = SessionKeySchema.toBinary(windowedBytesKey);
assertEquals(windowedBytesKey, SessionKeySchema.from(serialized));
}

2
streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java

@ -37,7 +37,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V @@ -37,7 +37,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
public void put(final Windowed<K> sessionKey, final V value) {
if (!sessions.containsKey(sessionKey.key())) {
sessions.put(sessionKey.key(), new ArrayList<KeyValue<Windowed<K>, V>>());
sessions.put(sessionKey.key(), new ArrayList<>());
}
sessions.get(sessionKey.key()).add(KeyValue.pair(sessionKey, value));
}

Loading…
Cancel
Save