From 8299f2a397b6033d60c295a689b7a45fcf413f4a Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Fri, 26 Apr 2019 13:22:36 -0700 Subject: [PATCH] KAFKA-8029: In memory session store (#6525) First pass at an in-memory session store implementation. Reviewers: Simon Geisler, Damian Guy , John Roesler , Bill Bejeck , Guozhang Wang --- .../developer-guide/processor-api.html | 2 +- docs/streams/upgrade-guide.html | 2 +- .../kafka/streams/state/SessionStore.java | 6 +- .../apache/kafka/streams/state/Stores.java | 21 + .../InMemorySessionBytesStoreSupplier.java | 59 ++ .../state/internals/InMemorySessionStore.java | 378 +++++++++++++ .../internals/InMemorySessionStoreTest.java | 502 ++++++++++++++++++ .../internals/RocksDBSessionStoreTest.java | 51 ++ 8 files changed, 1016 insertions(+), 5 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionBytesStoreSupplier.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index 4a060a6c399..31c11edc169 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -259,7 +259,7 @@ disk space is either not available or local disk space is wiped in-between app instance restarts.
  • Available store variants: - time window key-value store
  • + time window key-value store, session window key-value store.
    // Creating an in-memory key-value store:
     // here, we create a `KeyValueStore<String, Long>` named "inmemory-counts".
    diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
    index 58505e2b239..e07b5a9acd8 100644
    --- a/docs/streams/upgrade-guide.html
    +++ b/docs/streams/upgrade-guide.html
    @@ -73,7 +73,7 @@
         

    Streams API changes in 2.3.0

    Version 2.3.0 adds the Suppress operator to the kafka-streams-scala Ktable API.

    - As of 2.3.0 Streams now offers an in-memory version of the window store, in addition to the persistent one based on RocksDB. The new public interface inMemoryWindowStore() is added to Stores that provides a built-in in-memory window store. + As of 2.3.0 Streams now offers an in-memory version of the window and the session store, in addition to the persistent ones based on RocksDB. The new public interfaces inMemoryWindowStore() and inMemorySessionStore() are added to Stores and provide the built-in in-memory window or session store.

    diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 4f897e39c67..faaa751489a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -46,7 +46,7 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime); + KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); /** * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions @@ -61,7 +61,7 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime); + KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); /** * Get the value of key from a single session. @@ -72,7 +72,7 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore get() { + return new InMemorySessionStore(name, retentionPeriod, metricsScope()); + } + + @Override + public String metricsScope() { + return "in-memory-session-state"; + } + + // In-memory store is not *really* segmented, so just say it is 1 (for ordering consistency with caching enabled) + @Override + public long segmentIntervalMs() { + return 1; + } + + @Override + public long retentionPeriod() { + return retentionPeriod; + } +} + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java new file mode 100644 index 00000000000..c39dd5854fe --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemorySessionStore implements SessionStore { + + private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class); + + private final String name; + private final String metricScope; + private Sensor expiredRecordSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + + private final long retentionPeriod; + + private final ConcurrentNavigableMap>> endTimeMap = new ConcurrentSkipListMap<>(); + private final Set openIterators = ConcurrentHashMap.newKeySet(); + + private volatile boolean open = false; + + InMemorySessionStore(final String name, + final long retentionPeriod, + final String metricScope) { + this.name = name; + this.retentionPeriod = retentionPeriod; + this.metricScope = metricScope; + } + + @Override + public String name() { + return name; + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + final StreamsMetricsImpl metrics = ((InternalProcessorContext) context).metrics(); + final String taskName = context.taskId().toString(); + expiredRecordSensor = metrics.storeLevelSensor( + taskName, + name(), + EXPIRED_WINDOW_RECORD_DROP, + Sensor.RecordingLevel.INFO + ); + addInvocationRateAndCount( + expiredRecordSensor, + "stream-" + metricScope + "-metrics", + metrics.tagMap("task-id", taskName, metricScope + "-id", name()), + EXPIRED_WINDOW_RECORD_DROP + ); + + if (root != null) { + context.register(root, (key, value) -> put(SessionKeySchema.from(Bytes.wrap(key)), value)); + } + open = true; + } + + @Override + public void put(final Windowed sessionKey, final byte[] aggregate) { + removeExpiredSegments(); + + final long windowEndTimestamp = sessionKey.window().end(); + observedStreamTime = Math.max(observedStreamTime, windowEndTimestamp); + + if (windowEndTimestamp <= observedStreamTime - retentionPeriod) { + expiredRecordSensor.record(); + LOG.debug("Skipping record for expired segment."); + } else { + if (aggregate != null) { + endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap<>()); + final ConcurrentNavigableMap> keyMap = endTimeMap.get(windowEndTimestamp); + keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap<>()); + keyMap.get(sessionKey.key()).put(sessionKey.window().start(), aggregate); + } else { + remove(sessionKey); + } + } + } + + @Override + public void remove(final Windowed sessionKey) { + final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionKey.window().end()); + final ConcurrentNavigableMap startTimeMap = keyMap.get(sessionKey.key()); + startTimeMap.remove(sessionKey.window().start()); + + if (startTimeMap.isEmpty()) { + keyMap.remove(sessionKey.key()); + if (keyMap.isEmpty()) { + endTimeMap.remove(sessionKey.window().end()); + } + } + } + + @Override + public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + removeExpiredSegments(); + + // Only need to search if the record hasn't expired yet + if (endTime > observedStreamTime - retentionPeriod) { + final ConcurrentNavigableMap> keyMap = endTimeMap.get(endTime); + if (keyMap != null) { + final ConcurrentNavigableMap startTimeMap = keyMap.get(key); + if (startTimeMap != null) { + return startTimeMap.get(startTime); + } + } + } + return null; + } + + @Deprecated + @Override + public KeyValueIterator, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + removeExpiredSegments(); + + return registerNewIterator(key, + key, + latestSessionStartTime, + endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); + } + + @Deprecated + @Override + public KeyValueIterator, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + removeExpiredSegments(); + + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + return registerNewIterator(keyFrom, + keyTo, + latestSessionStartTime, + endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); + } + + @Override + public KeyValueIterator, byte[]> fetch(final Bytes key) { + removeExpiredSegments(); + + return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + } + + @Override + public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { + removeExpiredSegments(); + + return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + endTimeMap.clear(); + openIterators.clear(); + open = false; + } + + private void removeExpiredSegments() { + long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1); + + for (final InMemorySessionStoreIterator it : openIterators) { + minLiveTime = Math.min(minLiveTime, it.minTime()); + } + + endTimeMap.headMap(minLiveTime, false).clear(); + } + + private InMemorySessionStoreIterator registerNewIterator(final Bytes keyFrom, + final Bytes keyTo, + final long latestSessionStartTime, + final Iterator>>> endTimeIterator) { + final InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, it -> openIterators.remove(it)); + openIterators.add(iterator); + return iterator; + } + + interface ClosingCallback { + void deregisterIterator(final InMemorySessionStoreIterator iterator); + } + + private static class InMemorySessionStoreIterator implements KeyValueIterator, byte[]> { + + private final Iterator>>> endTimeIterator; + private Iterator>> keyIterator; + private Iterator> recordIterator; + + private KeyValue, byte[]> next; + private Bytes currentKey; + private long currentEndTime; + + private final Bytes keyFrom; + private final Bytes keyTo; + private final long latestSessionStartTime; + + private final ClosingCallback callback; + + InMemorySessionStoreIterator(final Bytes keyFrom, + final Bytes keyTo, + final long latestSessionStartTime, + final Iterator>>> endTimeIterator, + final ClosingCallback callback) { + this.keyFrom = keyFrom; + this.keyTo = keyTo; + this.latestSessionStartTime = latestSessionStartTime; + + this.endTimeIterator = endTimeIterator; + this.callback = callback; + setAllIterators(); + } + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } else if (recordIterator == null) { + return false; + } else { + next = getNext(); + return next != null; + } + } + + @Override + public Windowed peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next.key; + } + + @Override + public KeyValue, byte[]> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final KeyValue, byte[]> ret = next; + next = null; + return ret; + } + + @Override + public void close() { + callback.deregisterIterator(this); + } + + Long minTime() { + return currentEndTime; + } + + // getNext is only called when either recordIterator or segmentIterator has a next + // Note this does not guarantee a next record exists as the next segments may not contain any keys in range + private KeyValue, byte[]> getNext() { + if (!recordIterator.hasNext()) { + getNextIterators(); + } + + if (recordIterator == null) { + return null; + } + + final Map.Entry nextRecord = recordIterator.next(); + final SessionWindow sessionWindow = new SessionWindow(nextRecord.getKey(), currentEndTime); + final Windowed windowedKey = new Windowed<>(currentKey, sessionWindow); + + return new KeyValue<>(windowedKey, nextRecord.getValue()); + } + + // Called when the inner two (key and starttime) iterators are empty to roll to the next endTimestamp + // Rolls all three iterators forward until recordIterator has a next entry + // Sets recordIterator to null if there are no records to return + private void setAllIterators() { + while (endTimeIterator.hasNext()) { + final Entry>> nextEndTimeEntry = endTimeIterator.next(); + currentEndTime = nextEndTimeEntry.getKey(); + keyIterator = nextEndTimeEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); + + if (setInnerIterators()) { + return; + } + } + recordIterator = null; + } + + // Rolls the inner two iterators (key and record) forward until recordIterators has a next entry + // Returns false if no more records are found (for the current end time) + private boolean setInnerIterators() { + while (keyIterator.hasNext()) { + final Entry> nextKeyEntry = keyIterator.next(); + currentKey = nextKeyEntry.getKey(); + + if (latestSessionStartTime == Long.MAX_VALUE) { + recordIterator = nextKeyEntry.getValue().entrySet().iterator(); + } else { + recordIterator = nextKeyEntry.getValue().headMap(latestSessionStartTime, true).entrySet().iterator(); + } + + if (recordIterator.hasNext()) { + return true; + } + } + return false; + } + + // Called when the current recordIterator has no entries left to roll it to the next valid entry + // When there are no more records to return, recordIterator will be set to null + private void getNextIterators() { + if (setInnerIterators()) { + return; + } + + setAllIterators(); + } + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java new file mode 100644 index 00000000000..bbe8d217365 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.time.Duration.ofMillis; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.valuesToList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import java.util.Map; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.TestUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class InMemorySessionStoreTest { + + private static final String STORE_NAME = "InMemorySessionStore"; + private static final long RETENTION_PERIOD = 10_000L; + + private SessionStore sessionStore; + private InternalMockProcessorContext context; + + private final List> changeLog = new ArrayList<>(); + + private final Producer producer = new MockProducer<>(true, + Serdes.ByteArray().serializer(), + Serdes.ByteArray().serializer()); + + private final RecordCollector recordCollector = new RecordCollectorImpl( + STORE_NAME, + new LogContext(STORE_NAME), + new DefaultProductionExceptionHandler(), + new Metrics().sensor("skipped-records")) { + + @Override + public void send(final String topic, + final K1 key, + final V1 value, + final Headers headers, + final Integer partition, + final Long timestamp, + final Serializer keySerializer, + final Serializer valueSerializer) { + changeLog.add(new KeyValue<>( + keySerializer.serialize(topic, headers, key), + valueSerializer.serialize(topic, headers, value)) + ); + } + }; + + private SessionStore buildSessionStore(final long retentionPeriod) { + return Stores.sessionStoreBuilder( + Stores.inMemorySessionStore( + STORE_NAME, + ofMillis(retentionPeriod)), + Serdes.String(), + Serdes.Long()).build(); + } + + @Before + public void before() { + context = new InternalMockProcessorContext( + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + recordCollector, + new ThreadCache( + new LogContext("testCache"), + 0, + new MockStreamsMetrics(new Metrics()))); + + sessionStore = buildSessionStore(RETENTION_PERIOD); + + sessionStore.init(context, sessionStore); + recordCollector.init(producer); + } + + @After + public void after() { + sessionStore.close(); + } + + @Test + public void shouldPutAndFindSessionsInRange() { + final String key = "a"; + final Windowed a1 = new Windowed<>(key, new SessionWindow(10, 10L)); + final Windowed a2 = new Windowed<>(key, new SessionWindow(500L, 1000L)); + sessionStore.put(a1, 1L); + sessionStore.put(a2, 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); + + final List, Long>> expected = + Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator, Long> values = + sessionStore.findSessions(key, 0, 1000L) + ) { + assertEquals(expected, toList(values)); + } + + final List, Long>> expected2 = Collections.singletonList(KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator, Long> values2 = + sessionStore.findSessions(key, 400L, 600L) + ) { + assertEquals(expected2, toList(values2)); + } + } + + @Test + public void shouldFetchAllSessionsWithSameRecordKey() { + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); + + for (final KeyValue, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); + + try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldFetchAllSessionsWithinKeyRange() { + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), + KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); + + for (final KeyValue, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add some that shouldn't appear in the results + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); + + try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", "bb")) { + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldFetchExactSession() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 4)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 3)), 2L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 4)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(1, 4)), 4L); + sessionStore.put(new Windowed<>("aaa", new SessionWindow(0, 4)), 5L); + + final long result = sessionStore.fetchSession("aa", 0, 4); + assertEquals(3L, result); + } + + @Test + public void shouldFindValuesWithinMergingSessionWindowRange() { + final String key = "a"; + sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions(key, -1, 1000L)) { + assertEquals(expected, toList(results)); + } + } + + @Test + public void shouldRemove() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); + sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); + + sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000))); + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 0L, 1000L)) { + assertFalse(results.hasNext()); + } + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 1500L, 2500L)) { + assertTrue(results.hasNext()); + } + } + + @Test + public void shouldRemoveOnNullAggValue() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); + sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); + + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), null); + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 0L, 1000L)) { + assertFalse(results.hasNext()); + } + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 1500L, 2500L)) { + assertTrue(results.hasNext()); + } + } + + @Test + public void shouldFindSessionsToMerge() { + final Windowed session1 = new Windowed<>("a", new SessionWindow(0, 100)); + final Windowed session2 = new Windowed<>("a", new SessionWindow(101, 200)); + final Windowed session3 = new Windowed<>("a", new SessionWindow(201, 300)); + final Windowed session4 = new Windowed<>("a", new SessionWindow(301, 400)); + final Windowed session5 = new Windowed<>("a", new SessionWindow(401, 500)); + sessionStore.put(session1, 1L); + sessionStore.put(session2, 2L); + sessionStore.put(session3, 3L); + sessionStore.put(session4, 4L); + sessionStore.put(session5, 5L); + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 150, 300) + ) { + assertEquals(session2, results.next().key); + assertEquals(session3, results.next().key); + assertFalse(results.hasNext()); + } + } + + @Test + public void shouldFetchExactKeys() { + sessionStore = buildSessionStore(0x7a00000000000000L); + sessionStore.init(context, sessionStore); + + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(1L, 3L, 5L))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(2L, 4L))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(1L, 2L, 3L, 4L, 5L))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", "aa", 10, 0) + ) { + assertThat(valuesToList(iterator), equalTo(Collections.singletonList(2L))); + } + } + + @Test + public void testIteratorPeek() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + + final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", 0L, 20); + + assertEquals(iterator.peekNextKey(), new Windowed<>("a", new SessionWindow(0L, 0L))); + assertEquals(iterator.peekNextKey(), iterator.next().key); + assertEquals(iterator.peekNextKey(), iterator.next().key); + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldRemoveExpired() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + + // Advance stream time to expire the first record + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, RETENTION_PERIOD)), 4L); + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) + ) { + assertThat(valuesToList(iterator), equalTo(Arrays.asList(2L, 3L, 4L))); + } + } + + @Test + public void shouldRestore() { + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); + + for (final KeyValue, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { + assertEquals(expected, toList(values)); + } + + sessionStore.close(); + + try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { + assertEquals(Collections.emptyList(), toList(values)); + } + + context.restore(STORE_NAME, changeLog); + + try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1)), 0L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(2, 3)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(4, 5)), 2L); + sessionStore.put(new Windowed<>("aaa", new SessionWindow(6, 7)), 3L); + + final KeyValueIterator, Long> singleKeyIterator = sessionStore.findSessions("aa", 0L, 10L); + final KeyValueIterator, Long> keyRangeIterator = sessionStore.findSessions("aa", "aa", 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test + public void shouldLogAndMeasureExpiredRecords() { + LogCaptureAppender.setClassLoggerToDebug(InMemorySessionStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + + // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired + sessionStore.put(new Windowed<>("initial record", new SessionWindow(0, RETENTION_PERIOD)), 0L); + + // Try inserting a record with timestamp 0 -- should be dropped + sessionStore.put(new Windowed<>("late record", new SessionWindow(0, 0)), 0L); + sessionStore.put(new Windowed<>("another on-time record", new SessionWindow(0, RETENTION_PERIOD)), 0L); + + LogCaptureAppender.unregister(appender); + + final Map metrics = context.metrics().metrics(); + + final Metric dropTotal = metrics.get(new MetricName( + "expired-window-record-drop-total", + "stream-in-memory-session-state-metrics", + "The total number of occurrence of expired-window-record-drop operations.", + mkMap( + mkEntry("client-id", "mock"), + mkEntry("task-id", "0_0"), + mkEntry("in-memory-session-state-id", STORE_NAME) + ) + )); + + final Metric dropRate = metrics.get(new MetricName( + "expired-window-record-drop-rate", + "stream-in-memory-session-state-metrics", + "The average number of occurrence of expired-window-record-drop operation per second.", + mkMap( + mkEntry("client-id", "mock"), + mkEntry("task-id", "0_0"), + mkEntry("in-memory-session-state-id", STORE_NAME) + ) + )); + + assertEquals(1.0, dropTotal.metricValue()); + assertNotEquals(0.0, dropRate.metricValue()); + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Skipping record for expired segment.")); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { + sessionStore.findSessions(null, 1L, 2L); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { + sessionStore.findSessions(null, "anyKeyTo", 1L, 2L); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { + sessionStore.findSessions("anyKeyFrom", null, 1L, 2L); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { + sessionStore.fetch(null, "anyToKey"); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnFetchNullToKey() { + sessionStore.fetch("anyFromKey", null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnFetchNullKey() { + sessionStore.fetch(null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnRemoveNullKey() { + sessionStore.remove(null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionOnPutNullKey() { + sessionStore.put(null, 1L); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemorySessionStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final String keyFrom = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", -1)); + final String keyTo = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", 1)); + + final KeyValueIterator, Long> iterator = sessionStore.findSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 80ea4bad89b..41abdad3021 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -128,6 +128,39 @@ public class RocksDBSessionStoreTest { } } + @Test + public void shouldFetchAllSessionsWithinKeyRange() { + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), + KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); + + for (final KeyValue, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add some that shouldn't appear in the results + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); + + try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", "bb")) { + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldFetchExactSession() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 4)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 3)), 2L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 4)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(1, 4)), 4L); + sessionStore.put(new Windowed<>("aaa", new SessionWindow(0, 4)), 5L); + + final long result = sessionStore.fetchSession("aa", 0, 4); + assertEquals(3L, result); + } + @Test public void shouldFindValuesWithinMergingSessionWindowRange() { final String key = "a"; @@ -162,6 +195,24 @@ public class RocksDBSessionStoreTest { } } + @Test + public void shouldRemoveOnNullAggValue() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); + sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); + + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), null); + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 0L, 1000L)) { + assertFalse(results.hasNext()); + } + + try (final KeyValueIterator, Long> results = + sessionStore.findSessions("a", 1500L, 2500L)) { + assertTrue(results.hasNext()); + } + } + @Test public void shouldFindSessionsToMerge() { final Windowed session1 = new Windowed<>("a", new SessionWindow(0, 100));