From 14314e3d687b4c7b77750d27a085b803c934e3e9 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Sat, 4 May 2019 07:44:18 -0700 Subject: [PATCH] [HOT FIX] in-memory store behavior should match rocksDB (#6657) While working on consolidating the various store unit tests I uncovered some minor "bugs" in the in-memory stores (inconsistencies with the behavior as established by the RocksDB stores). open iterators should be properly closed in the case the store is closed fetch/findSessions should always throw NPE if key is null window end time should be truncated at Long.MAX_VALUE rather than throw exception (Verified in-memory stores pass all applicable rocksDB tests now, unified unit tests coming in another PR) Reviewers: Guozhang Wang , Bill Bejeck --- .../state/internals/InMemorySessionStore.java | 25 +++++++ .../state/internals/InMemoryWindowStore.java | 70 +++++++++++++++---- .../state/internals/WindowKeySchema.java | 13 +++- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index c39dd5854fe..f3b85657278 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; @@ -134,6 +135,8 @@ public class InMemorySessionStore implements SessionStore { public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { removeExpiredSegments(); + Objects.requireNonNull(key, "key cannot be null"); + // Only need to search if the record hasn't expired yet if (endTime > observedStreamTime - retentionPeriod) { final ConcurrentNavigableMap> keyMap = endTimeMap.get(endTime); @@ -152,6 +155,8 @@ public class InMemorySessionStore implements SessionStore { public KeyValueIterator, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { + Objects.requireNonNull(key, "key cannot be null"); + removeExpiredSegments(); return registerNewIterator(key, @@ -166,6 +171,9 @@ public class InMemorySessionStore implements SessionStore { final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + removeExpiredSegments(); if (keyFrom.compareTo(keyTo) > 0) { @@ -183,6 +191,9 @@ public class InMemorySessionStore implements SessionStore { @Override public KeyValueIterator, byte[]> fetch(final Bytes key) { + + Objects.requireNonNull(key, "key cannot be null"); + removeExpiredSegments(); return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); @@ -190,8 +201,13 @@ public class InMemorySessionStore implements SessionStore { @Override public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { + + Objects.requireNonNull(from, "from key cannot be null"); + Objects.requireNonNull(to, "to key cannot be null"); + removeExpiredSegments(); + return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); } @@ -212,6 +228,13 @@ public class InMemorySessionStore implements SessionStore { @Override public void close() { + if (openIterators.size() != 0) { + LOG.warn("Closing {} open iterators for store {}", openIterators.size(), name); + for (final InMemorySessionStoreIterator it : openIterators) { + it.close(); + } + } + endTimeMap.clear(); openIterators.clear(); open = false; @@ -303,6 +326,8 @@ public class InMemorySessionStore implements SessionStore { @Override public void close() { + next = null; + recordIterator = null; callback.deregisterIterator(this); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 797a5d9a2ba..8063410212e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; @@ -135,6 +136,9 @@ public class InMemoryWindowStore implements WindowStore { } else { segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> { kvMap.remove(keyBytes); + if (kvMap.isEmpty()) { + segmentMap.remove(windowStartTimestamp); + } return kvMap; }); } @@ -143,6 +147,9 @@ public class InMemoryWindowStore implements WindowStore { @Override public byte[] fetch(final Bytes key, final long windowStartTimestamp) { + + Objects.requireNonNull(key, "key cannot be null"); + removeExpiredSegments(); if (windowStartTimestamp <= observedStreamTime - retentionPeriod) { @@ -160,6 +167,9 @@ public class InMemoryWindowStore implements WindowStore { @Deprecated @Override public WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo) { + + Objects.requireNonNull(key, "key cannot be null"); + removeExpiredSegments(); // add one b/c records expire exactly retentionPeriod ms after created @@ -179,6 +189,9 @@ public class InMemoryWindowStore implements WindowStore { final Bytes to, final long timeFrom, final long timeTo) { + Objects.requireNonNull(from, "from key cannot be null"); + Objects.requireNonNull(to, "to key cannot be null"); + removeExpiredSegments(); if (from.compareTo(to) > 0) { @@ -242,6 +255,13 @@ public class InMemoryWindowStore implements WindowStore { @Override public void close() { + if (openIterators.size() != 0) { + LOG.warn("Closing {} open iterators for store {}", openIterators.size(), name); + for (final InMemoryWindowStoreIteratorWrapper it : openIterators) { + it.close(); + } + } + segmentMap.clear(); open = false; } @@ -281,7 +301,7 @@ public class InMemoryWindowStore implements WindowStore { final Bytes keyTo = retainDuplicates ? wrapForDups(key, Integer.MAX_VALUE) : key; final WrappedInMemoryWindowStoreIterator iterator = - new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove); + new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove, retainDuplicates); openIterators.add(iterator); return iterator; @@ -319,15 +339,18 @@ public class InMemoryWindowStore implements WindowStore { private final boolean allKeys; private final Bytes keyFrom; private final Bytes keyTo; + private final boolean retainDuplicates; private final ClosingCallback callback; InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom, final Bytes keyTo, final Iterator>> segmentIterator, - final ClosingCallback callback) { + final ClosingCallback callback, + final boolean retainDuplicates) { this.keyFrom = keyFrom; this.keyTo = keyTo; allKeys = (keyFrom == null) && (keyTo == null); + this.retainDuplicates = retainDuplicates; this.segmentIterator = segmentIterator; this.callback = callback; @@ -343,15 +366,26 @@ public class InMemoryWindowStore implements WindowStore { } next = getNext(); - return next != null; - } + if (next == null) { + return false; + } - public void remove() { - throw new UnsupportedOperationException( - "remove() is not supported in " + getClass().getName()); + if (allKeys || !retainDuplicates) { + return true; + } + + final Bytes key = getKey(next.key); + if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) { + return true; + } else { + next = null; + return hasNext(); + } } public void close() { + next = null; + recordIterator = null; callback.deregisterIterator(this); } @@ -395,8 +429,9 @@ public class InMemoryWindowStore implements WindowStore { WrappedInMemoryWindowStoreIterator(final Bytes keyFrom, final Bytes keyTo, final Iterator>> segmentIterator, - final ClosingCallback callback) { - super(keyFrom, keyTo, segmentIterator, callback); + final ClosingCallback callback, + final boolean retainDuplicates) { + super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates); } @Override @@ -419,13 +454,12 @@ public class InMemoryWindowStore implements WindowStore { } public static WrappedInMemoryWindowStoreIterator emptyIterator() { - return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { }); + return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { }, false); } } private static class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator, byte[]> { - private final boolean retainDuplicates; private final long windowSize; WrappedWindowedKeyValueIterator(final Bytes keyFrom, @@ -434,8 +468,7 @@ public class InMemoryWindowStore implements WindowStore { final ClosingCallback callback, final boolean retainDuplicates, final long windowSize) { - super(keyFrom, keyTo, segmentIterator, callback); - this.retainDuplicates = retainDuplicates; + super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates); this.windowSize = windowSize; } @@ -457,8 +490,15 @@ public class InMemoryWindowStore implements WindowStore { } private Windowed getWindowedKey() { - final Bytes key = retainDuplicates ? getKey(super.next.key) : super.next.key; - final TimeWindow timeWindow = new TimeWindow(super.currentTime, super.currentTime + windowSize); + final Bytes key = super.retainDuplicates ? getKey(super.next.key) : super.next.key; + long endTime = super.currentTime + windowSize; + + if (endTime < 0) { + LOG.warn("Warning: window end time was truncated to Long.MAX"); + endTime = Long.MAX_VALUE; + } + + final TimeWindow timeWindow = new TimeWindow(super.currentTime, endTime); return new Windowed<>(key, timeWindow); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index dd8a2f111fb..9218ccf0a77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -26,9 +26,13 @@ import org.apache.kafka.streams.state.StateSerdes; import java.nio.ByteBuffer; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { + private static final Logger LOG = LoggerFactory.getLogger(WindowKeySchema.class); + private static final int SEQNUM_SIZE = 4; private static final int TIMESTAMP_SIZE = 8; private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; @@ -99,8 +103,13 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { */ static TimeWindow timeWindowForSize(final long startMs, final long windowSize) { - final long endMs = startMs + windowSize; - return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs); + long endMs = startMs + windowSize; + + if (endMs < 0) { + LOG.warn("Warning: window end time was truncated to Long.MAX"); + endMs = Long.MAX_VALUE; + } + return new TimeWindow(startMs, endMs); } // for pipe serdes