diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 2b37f2428f4..becb03db24c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -871,22 +871,13 @@ public class KStreamImpl extends AbstractStream implements KStream> otherWindowStore = createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store"); - - final KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow<>( - thisWindowStore.name(), - windows.beforeMs + windows.afterMs + 1, - windows.maintainMs() - ); + final KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name()); final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName); final ProcessorGraphNode thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams); builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode); - final KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow<>( - otherWindowStore.name(), - windows.beforeMs + windows.afterMs + 1, - windows.maintainMs() - ); + final KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name()); final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName); final ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java index 895dab4fec1..34756d47cf2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -27,15 +26,8 @@ class KStreamJoinWindow implements ProcessorSupplier { private final String windowName; - /** - * @throws TopologyException if retention period of the join window is less than expected - */ - KStreamJoinWindow(final String windowName, final long windowSizeMs, final long retentionPeriodMs) { + KStreamJoinWindow(final String windowName) { this.windowName = windowName; - - if (windowSizeMs > retentionPeriodMs) - throw new TopologyException("The retention period of the join window " - + windowName + " must be no smaller than its window size."); } @Override @@ -61,7 +53,8 @@ class KStreamJoinWindow implements ProcessorSupplier { // since it will never be considered for join operations if (key != null) { context().forward(key, value); - window.put(key, value); + // Every record basically starts a new window. We're using a window store mostly for the retention. + window.put(key, value, context().timestamp()); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 03eaa070570..3bda28d6572 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -144,7 +144,10 @@ public class Stores { * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * @param numSegments number of db segments (cannot be zero or negative) - * @param windowSize size of the windows (cannot be negative) + * @param windowSize size of the windows that are stored (cannot be negative). Note: the window size + * is not stored with the records, so this value is used to compute the keys that + * the store returns. No effort is made to validate this parameter, so you must be + * careful to set it the same as the windowed keys you're actually storing. * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead @@ -211,6 +214,11 @@ public class Stores { if (segmentInterval < 1L) { throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); } + if (windowSize > retentionPeriod) { + throw new IllegalArgumentException("The retention period of the window store " + + name + " must be no smaller than its window size. Got size=[" + + windowSize + "], retention=[" + retentionPeriod + "]"); + } return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index ee4c19a3dcf..1685123ff96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -27,8 +27,12 @@ import org.apache.kafka.streams.processor.StateStore; public interface WindowStore extends StateStore, ReadOnlyWindowStore { /** - * Put a key-value pair with the current record time as the timestamp - * into the corresponding window + * Use the current record timestamp as the {@code windowStartTimestamp} and + * delegate to {@link WindowStore#put(Object, Object, long)}. + * + * It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp + * is unlikely to be the correct windowStartTimestamp in general. + * * @param key The key to associate the value to * @param value The value to update, it can be null; * if the serialized bytes are also null it is interpreted as deletes @@ -40,7 +44,8 @@ public interface WindowStore extends StateStore, ReadOnlyWindowStore * Put a key-value pair with the given timestamp into the corresponding window * @param key The key to associate the value to * @param value The value; can be null + * @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into * @throws NullPointerException If null is used for key. */ - void put(K key, V value, long timestamp); + void put(K key, V value, long windowStartTimestamp); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 4347811576b..688e88962a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -144,12 +144,12 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl } @Override - public synchronized void put(final Bytes key, final byte[] value, final long timestamp) { + public synchronized void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); - final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0); + final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0); final LRUCacheEntry entry = new LRUCacheEntry( value, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index aa9cbe66ee4..785aacd25c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -77,9 +77,9 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore } @Override - public void put(final Bytes key, final byte[] value, final long timestamp) { - bytesStore.put(key, value, timestamp); - changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, maybeUpdateSeqnumForDups()), value); + public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { + bytesStore.put(key, value, windowStartTimestamp); + changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 5a27ed4a1eb..5162eac8848 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -108,10 +108,10 @@ public class MeteredWindowStore extends WrappedStateStore.AbstractStateSto } @Override - public void put(final K key, final V value, final long timestamp) { + public void put(final K key, final V value, final long windowStartTimestamp) { final long startNs = time.nanoseconds(); try { - inner.put(keyBytes(key), serdes.rawValue(value), timestamp); + inner.put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp); } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), key, value); throw new ProcessorStateException(message, e); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 4c0e01fe0a9..d7bb523b049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -70,10 +70,10 @@ public class RocksDBWindowStore extends WrappedStateStore.AbstractStateSto } @Override - public void put(final K key, final V value, final long timestamp) { + public void put(final K key, final V value, final long windowStartTimestamp) { maybeUpdateSeqnumForDups(); - bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value)); + bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum, serdes), serdes.rawValue(value)); } @Override