Browse Source

MINOR: clean up window store interface to avoid confusion (#5359)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
pull/5461/head
John Roesler 6 years ago committed by Matthias J. Sax
parent
commit
b9f1179694
  1. 13
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  2. 13
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
  3. 10
      streams/src/main/java/org/apache/kafka/streams/state/Stores.java
  4. 11
      streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
  5. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
  6. 6
      streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
  7. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
  8. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java

13
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -871,22 +871,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -871,22 +871,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store");
final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(
thisWindowStore.name(),
windows.beforeMs + windows.afterMs + 1,
windows.maintainMs()
);
final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(
otherWindowStore.name(),
windows.beforeMs + windows.afterMs + 1,
windows.maintainMs()
);
final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name());
final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);

13
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -27,15 +26,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { @@ -27,15 +26,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
private final String windowName;
/**
* @throws TopologyException if retention period of the join window is less than expected
*/
KStreamJoinWindow(final String windowName, final long windowSizeMs, final long retentionPeriodMs) {
KStreamJoinWindow(final String windowName) {
this.windowName = windowName;
if (windowSizeMs > retentionPeriodMs)
throw new TopologyException("The retention period of the join window "
+ windowName + " must be no smaller than its window size.");
}
@Override
@ -61,7 +53,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { @@ -61,7 +53,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
// since it will never be considered for join operations
if (key != null) {
context().forward(key, value);
window.put(key, value);
// Every record basically starts a new window. We're using a window store mostly for the retention.
window.put(key, value, context().timestamp());
}
}
}

10
streams/src/main/java/org/apache/kafka/streams/state/Stores.java

@ -144,7 +144,10 @@ public class Stores { @@ -144,7 +144,10 @@ public class Stores {
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* @param numSegments number of db segments (cannot be zero or negative)
* @param windowSize size of the windows (cannot be negative)
* @param windowSize size of the windows that are stored (cannot be negative). Note: the window size
* is not stored with the records, so this value is used to compute the keys that
* the store returns. No effort is made to validate this parameter, so you must be
* careful to set it the same as the windowed keys you're actually storing.
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
@ -211,6 +214,11 @@ public class Stores { @@ -211,6 +214,11 @@ public class Stores {
if (segmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
}
if (windowSize > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
}
return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
}

11
streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java

@ -27,8 +27,12 @@ import org.apache.kafka.streams.processor.StateStore; @@ -27,8 +27,12 @@ import org.apache.kafka.streams.processor.StateStore;
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
/**
* Put a key-value pair with the current record time as the timestamp
* into the corresponding window
* Use the current record timestamp as the {@code windowStartTimestamp} and
* delegate to {@link WindowStore#put(Object, Object, long)}.
*
* It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp
* is unlikely to be the correct windowStartTimestamp in general.
*
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
@ -40,7 +44,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> @@ -40,7 +44,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* Put a key-value pair with the given timestamp into the corresponding window
* @param key The key to associate the value to
* @param value The value; can be null
* @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
* @throws NullPointerException If null is used for key.
*/
void put(K key, V value, long timestamp);
void put(K key, V value, long windowStartTimestamp);
}

4
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

@ -144,12 +144,12 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @@ -144,12 +144,12 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
}
@Override
public synchronized void put(final Bytes key, final byte[] value, final long timestamp) {
public synchronized void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
// since this function may not access the underlying inner store, we need to validate
// if store is open outside as well.
validateStoreOpen();
final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
final LRUCacheEntry entry =
new LRUCacheEntry(
value,

6
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java

@ -77,9 +77,9 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore @@ -77,9 +77,9 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
}
@Override
public void put(final Bytes key, final byte[] value, final long timestamp) {
bytesStore.put(key, value, timestamp);
changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, maybeUpdateSeqnumForDups()), value);
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
bytesStore.put(key, value, windowStartTimestamp);
changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
}
@Override

4
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java

@ -108,10 +108,10 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto @@ -108,10 +108,10 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
public void put(final K key, final V value, final long timestamp) {
public void put(final K key, final V value, final long windowStartTimestamp) {
final long startNs = time.nanoseconds();
try {
inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
inner.put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
throw new ProcessorStateException(message, e);

4
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java

@ -70,10 +70,10 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto @@ -70,10 +70,10 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
public void put(final K key, final V value, final long timestamp) {
public void put(final K key, final V value, final long windowStartTimestamp) {
maybeUpdateSeqnumForDups();
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value));
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
}
@Override

Loading…
Cancel
Save