diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 49fe96ba20c..6db7a708541 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; -import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; @@ -155,6 +155,6 @@ public interface Suppressed { * @return a suppression configuration */ static Suppressed untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { - return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null, false); + return new SuppressedInternal<>(timeToWaitForMoreEvents, bufferConfig, null, false); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java index 8a2e619b7b5..9bb83733c82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.ByteBufferDeserializer; -import org.apache.kafka.common.serialization.ByteBufferSerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -30,6 +28,17 @@ import static java.util.Objects.requireNonNull; public class FullChangeSerde implements Serde> { private final Serde inner; + @SuppressWarnings("unchecked") + public static FullChangeSerde castOrWrap(final Serde serde) { + if (serde == null) { + return null; + } else if (serde instanceof FullChangeSerde) { + return (FullChangeSerde) serde; + } else { + return new FullChangeSerde((Serde) serde); + } + } + public FullChangeSerde(final Serde inner) { this.inner = requireNonNull(inner); } @@ -47,7 +56,6 @@ public class FullChangeSerde implements Serde> { @Override public Serializer> serializer() { final Serializer innerSerializer = inner.serializer(); - final ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer(); return new Serializer>() { @Override @@ -65,8 +73,8 @@ public class FullChangeSerde implements Serde> { final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue); final int newSize = newBytes == null ? -1 : newBytes.length; - final ByteBuffer buffer = ByteBuffer.allocate( - 4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize) + final ByteBuffer buffer = ByteBuffer.wrap( + new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)] ); buffer.putInt(oldSize); if (oldBytes != null) { @@ -76,7 +84,7 @@ public class FullChangeSerde implements Serde> { if (newBytes != null) { buffer.put(newBytes); } - return byteBufferSerializer.serialize(null, buffer); + return buffer.array(); } @Override @@ -89,7 +97,6 @@ public class FullChangeSerde implements Serde> { @Override public Deserializer> deserializer() { final Deserializer innerDeserializer = inner.deserializer(); - final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer(); return new Deserializer>() { @Override public void configure(final Map configs, final boolean isKey) { @@ -101,7 +108,7 @@ public class FullChangeSerde implements Serde> { if (data == null) { return null; } - final ByteBuffer buffer = byteBufferDeserializer.deserialize(null, data); + final ByteBuffer buffer = ByteBuffer.wrap(data); final int oldSize = buffer.getInt(); final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize]; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java new file mode 100644 index 00000000000..a69002f9900 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedSerializer; +import org.apache.kafka.streams.kstream.Windowed; + +class FullTimeWindowedSerde extends Serdes.WrapperSerde> { + FullTimeWindowedSerde(final Serde inner, final long windowSize) { + super( + new TimeWindowedSerializer<>(inner.serializer()), + new TimeWindowedDeserializer<>(inner.deserializer(), windowSize) + ); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 53e7a4ba3bd..3ce962b1926 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -40,7 +40,7 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor; -import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -357,12 +357,11 @@ public class KTableImpl extends AbstractStream implements KTable< public KTable suppress(final Suppressed suppressed) { final String name = builder.newProcessorName(SUPPRESS_NAME); - // TODO: follow-up pr to forward the k/v serdes final ProcessorSupplier> suppressionSupplier = () -> new KTableSuppressProcessor<>( buildSuppress(suppressed), - null, - null + keySerde, + valSerde == null ? null : new FullChangeSerde<>(valSerde) ); final ProcessorParameters> processorParameters = new ProcessorParameters<>( @@ -388,18 +387,18 @@ public class KTableImpl extends AbstractStream implements KTable< } @SuppressWarnings("unchecked") - private SuppressedImpl buildSuppress(final Suppressed suppress) { + private SuppressedInternal buildSuppress(final Suppressed suppress) { if (suppress instanceof FinalResultsSuppressionBuilder) { final long grace = findAndVerifyWindowGrace(streamsGraphNode); final FinalResultsSuppressionBuilder builder = (FinalResultsSuppressionBuilder) suppress; - final SuppressedImpl finalResultsSuppression = + final SuppressedInternal finalResultsSuppression = builder.buildFinalResultsSuppression(Duration.ofMillis(grace)); - return (SuppressedImpl) finalResultsSuppression; - } else if (suppress instanceof SuppressedImpl) { - return (SuppressedImpl) suppress; + return (SuppressedInternal) finalResultsSuppression; + } else if (suppress instanceof SuppressedInternal) { + return (SuppressedInternal) suppress; } else { throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 8519671f6b6..fcb9c0240ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.state.StoreBuilder; @@ -93,7 +92,7 @@ public class TimeWindowedKStreamImpl extends AbstractStr materialize(materializedInternal), new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), materializedInternal.isQueryable(), - materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, materializedInternal.valueSerde()); } @@ -120,7 +119,7 @@ public class TimeWindowedKStreamImpl extends AbstractStr materialize(materializedInternal), new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator), materializedInternal.isQueryable(), - materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, materializedInternal.valueSerde()); } @@ -149,7 +148,7 @@ public class TimeWindowedKStreamImpl extends AbstractStr materialize(materializedInternal), new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer), materializedInternal.isQueryable(), - materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, materializedInternal.valueSerde()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java similarity index 84% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index e731dc6f5e1..67d3783867c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -20,8 +20,8 @@ import org.apache.kafka.streams.kstream.Suppressed; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; -abstract class BufferConfigImpl> implements Suppressed.BufferConfig { - public abstract long maxKeys(); +abstract class BufferConfigInternal> implements Suppressed.BufferConfig { + public abstract long maxRecords(); public abstract long maxBytes(); @@ -39,12 +39,12 @@ abstract class BufferConfigImpl> implemen @Override public Suppressed.StrictBufferConfig shutDownWhenFull() { - return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN); + return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN); } @Override public Suppressed.BufferConfig emitEarlyWhenFull() { - return new EagerBufferConfigImpl(maxKeys(), maxBytes()); + return new EagerBufferConfigImpl(maxRecords(), maxBytes()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index 0c2c883e18a..161f934f3a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -20,13 +20,13 @@ import org.apache.kafka.streams.kstream.Suppressed; import java.util.Objects; -public class EagerBufferConfigImpl extends BufferConfigImpl { +public class EagerBufferConfigImpl extends BufferConfigInternal { - private final long maxKeys; + private final long maxRecords; private final long maxBytes; - public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) { - this.maxKeys = maxKeys; + public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { + this.maxRecords = maxRecords; this.maxBytes = maxBytes; } @@ -37,12 +37,12 @@ public class EagerBufferConfigImpl extends BufferConfigImpl { @Override public Suppressed.BufferConfig withMaxBytes(final long byteLimit) { - return new EagerBufferConfigImpl(maxKeys, byteLimit); + return new EagerBufferConfigImpl(maxRecords, byteLimit); } @Override - public long maxKeys() { - return maxKeys; + public long maxRecords() { + return maxRecords; } @Override @@ -60,17 +60,17 @@ public class EagerBufferConfigImpl extends BufferConfigImpl { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o; - return maxKeys == that.maxKeys && + return maxRecords == that.maxRecords && maxBytes == that.maxBytes; } @Override public int hashCode() { - return Objects.hash(maxKeys, maxBytes); + return Objects.hash(maxRecords, maxBytes); } @Override public String toString() { - return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}'; + return "EagerBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + maxBytes + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java index db09307d48c..523ae0602c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; import java.time.Duration; import java.util.Objects; @@ -30,11 +29,11 @@ public class FinalResultsSuppressionBuilder implements Suppr this.bufferConfig = bufferConfig; } - public SuppressedImpl buildFinalResultsSuppression(final Duration gracePeriod) { - return new SuppressedImpl<>( + public SuppressedInternal buildFinalResultsSuppression(final Duration gracePeriod) { + return new SuppressedInternal<>( gracePeriod, bufferConfig, - (ProcessorContext context, K key) -> key.window().end(), + TimeDefinitions.WindowEndTimeDefinition.instance(), true ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java new file mode 100644 index 00000000000..677a662f79d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.internals.ContextualRecord; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Supplier; + +class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { + private final Map index = new HashMap<>(); + private final TreeMap sortedMap = new TreeMap<>(); + private long memBufferSize = 0L; + private long minTimestamp = Long.MAX_VALUE; + + @Override + public void evictWhile(final Supplier predicate, + final Consumer> callback) { + final Iterator> delegate = sortedMap.entrySet().iterator(); + + if (predicate.get()) { + Map.Entry next = null; + if (delegate.hasNext()) { + next = delegate.next(); + } + + // predicate being true means we read one record, call the callback, and then remove it + while (next != null && predicate.get()) { + callback.accept(new KeyValue<>(next.getKey().key(), next.getValue())); + + delegate.remove(); + index.remove(next.getKey().key()); + + memBufferSize = memBufferSize - computeRecordSize(next.getKey().key(), next.getValue()); + + // peek at the next record so we can update the minTimestamp + if (delegate.hasNext()) { + next = delegate.next(); + minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time(); + } else { + next = null; + minTimestamp = Long.MAX_VALUE; + } + } + } + } + + @Override + public void put(final long time, + final Bytes key, + final ContextualRecord value) { + // non-resetting semantics: + // if there was a previous version of the same record, + // then insert the new record in the same place in the priority queue + + final TimeKey previousKey = index.get(key); + if (previousKey == null) { + final TimeKey nextKey = new TimeKey(time, key); + index.put(key, nextKey); + sortedMap.put(nextKey, value); + minTimestamp = Math.min(minTimestamp, time); + memBufferSize = memBufferSize + computeRecordSize(key, value); + } else { + final ContextualRecord removedValue = sortedMap.put(previousKey, value); + memBufferSize = + memBufferSize + + computeRecordSize(key, value) + - (removedValue == null ? 0 : computeRecordSize(key, removedValue)); + } + } + + @Override + public int numRecords() { + return index.size(); + } + + @Override + public long bufferSize() { + return memBufferSize; + } + + @Override + public long minTimestamp() { + return minTimestamp; + } + + private long computeRecordSize(final Bytes key, final ContextualRecord value) { + long size = 0L; + size += 8; // buffer time + size += key.get().length; + if (value != null) { + size += value.sizeBytes(); + } + return size; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 6f0021fbc49..57e5066d09e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -17,70 +17,117 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; - -import java.time.Duration; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.state.internals.ContextualRecord; import static java.util.Objects.requireNonNull; public class KTableSuppressProcessor implements Processor> { - private final SuppressedImpl suppress; + private final long maxRecords; + private final long maxBytes; + private final long suppressDurationMillis; + private final TimeOrderedKeyValueBuffer buffer; + private final TimeDefinition bufferTimeDefinition; + private final BufferFullStrategy bufferFullStrategy; + private final boolean shouldSuppressTombstones; private InternalProcessorContext internalProcessorContext; - private final Serde keySerde; - private final Serde> valueSerde; + private Serde keySerde; + private Serde> valueSerde; - public KTableSuppressProcessor(final SuppressedImpl suppress, + public KTableSuppressProcessor(final SuppressedInternal suppress, final Serde keySerde, - final Serde> valueSerde) { - this.suppress = requireNonNull(suppress); + final FullChangeSerde valueSerde) { + requireNonNull(suppress); this.keySerde = keySerde; this.valueSerde = valueSerde; + maxRecords = suppress.getBufferConfig().maxRecords(); + maxBytes = suppress.getBufferConfig().maxBytes(); + suppressDurationMillis = suppress.getTimeToWaitForMoreEvents().toMillis(); + buffer = new InMemoryTimeOrderedKeyValueBuffer(); + bufferTimeDefinition = suppress.getTimeDefinition(); + bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy(); + shouldSuppressTombstones = suppress.shouldSuppressTombstones(); } + @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; + this.keySerde = keySerde == null ? (Serde) context.keySerde() : keySerde; + this.valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde; } @Override public void process(final K key, final Change value) { - if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) { - if (shouldForward(value)) { - internalProcessorContext.forward(key, value); - } // else skip - } else { - throw new NotImplementedException(); - } + buffer(key, value); + enforceConstraints(); } - private boolean shouldForward(final Change value) { - return !(value.newValue == null && suppress.suppressTombstones()); - } + private void buffer(final K key, final Change value) { + final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key); + final ProcessorRecordContext recordContext = internalProcessorContext.recordContext(); - private long definedRecordTime(final K key) { - return suppress.getTimeDefinition().time(internalProcessorContext, key); + final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key)); + final byte[] serializedValue = valueSerde.serializer().serialize(null, value); + + buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext)); } - @Override - public void close() { + private void enforceConstraints() { + final long streamTime = internalProcessorContext.streamTime(); + final long expiryTime = streamTime - suppressDurationMillis; + + buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit); + + if (overCapacity()) { + switch (bufferFullStrategy) { + case EMIT: + buffer.evictWhile(this::overCapacity, this::emit); + return; + case SHUT_DOWN: + throw new StreamsException(String.format( + "%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.", + internalProcessorContext.currentNode().name(), + buffer.numRecords(), maxRecords, + buffer.bufferSize(), maxBytes + )); + } + } } - @Override - public String toString() { - return "KTableSuppressProcessor{" + - "suppress=" + suppress + - ", keySerde=" + keySerde + - ", valueSerde=" + valueSerde + - '}'; + private boolean overCapacity() { + return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes; } - public static class NotImplementedException extends RuntimeException { - NotImplementedException() { - super(); + private void emit(final KeyValue toEmit) { + final Change value = valueSerde.deserializer().deserialize(null, toEmit.value.value()); + if (shouldForward(value)) { + final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); + internalProcessorContext.setRecordContext(toEmit.value.recordContext()); + try { + final K key = keySerde.deserializer().deserialize(null, toEmit.key.get()); + internalProcessorContext.forward(key, value); + } finally { + internalProcessorContext.setRecordContext(prevRecordContext); + } } } + + private boolean shouldForward(final Change value) { + return !(value.newValue == null && shouldSuppressTombstones); + } + + @Override + public void close() { + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index 0634a748a5b..ef754ec6fc2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -22,22 +22,22 @@ import java.util.Objects; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; -public class StrictBufferConfigImpl extends BufferConfigImpl implements Suppressed.StrictBufferConfig { +public class StrictBufferConfigImpl extends BufferConfigInternal implements Suppressed.StrictBufferConfig { - private final long maxKeys; + private final long maxRecords; private final long maxBytes; private final BufferFullStrategy bufferFullStrategy; - public StrictBufferConfigImpl(final long maxKeys, + public StrictBufferConfigImpl(final long maxRecords, final long maxBytes, final BufferFullStrategy bufferFullStrategy) { - this.maxKeys = maxKeys; + this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.bufferFullStrategy = bufferFullStrategy; } public StrictBufferConfigImpl() { - this.maxKeys = Long.MAX_VALUE; + this.maxRecords = Long.MAX_VALUE; this.maxBytes = Long.MAX_VALUE; this.bufferFullStrategy = SHUT_DOWN; } @@ -49,12 +49,12 @@ public class StrictBufferConfigImpl extends BufferConfigImpl { + long time(final ProcessorContext context, final K key); + + TimeDefinitionType type(); + } + + public static class RecordTimeDefintion implements TimeDefinition { + private static final RecordTimeDefintion INSTANCE = new RecordTimeDefintion(); + + private RecordTimeDefintion() {} + + @SuppressWarnings("unchecked") + public static RecordTimeDefintion instance() { + return RecordTimeDefintion.INSTANCE; + } + + @Override + public long time(final ProcessorContext context, final K key) { + return context.timestamp(); + } + + @Override + public TimeDefinitionType type() { + return TimeDefinitionType.RECORD_TIME; + } + } + + public static class WindowEndTimeDefinition implements TimeDefinition { + private static final WindowEndTimeDefinition INSTANCE = new WindowEndTimeDefinition(); + + private WindowEndTimeDefinition() {} + + @SuppressWarnings("unchecked") + public static WindowEndTimeDefinition instance() { + return WindowEndTimeDefinition.INSTANCE; + } + + @Override + public long time(final ProcessorContext context, final K key) { + return key.window().end(); + } + + @Override + public TimeDefinitionType type() { + return TimeDefinitionType.WINDOW_END_TIME; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java new file mode 100644 index 00000000000..d3ad350686a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.common.utils.Bytes; + +import java.util.Objects; + +class TimeKey implements Comparable { + private final long time; + private final Bytes key; + + TimeKey(final long time, final Bytes key) { + this.time = time; + this.key = key; + } + + Bytes key() { + return key; + } + + long time() { + return time; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final TimeKey timeKey = (TimeKey) o; + return time == timeKey.time && + Objects.equals(key, timeKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(time, key); + } + + @Override + public int compareTo(final TimeKey o) { + // ordering of keys within a time uses hashCode. + final int timeComparison = Long.compare(time, o.time); + return timeComparison == 0 ? key.compareTo(o.key) : timeComparison; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java new file mode 100644 index 00000000000..98a4f63c83f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.internals.ContextualRecord; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +interface TimeOrderedKeyValueBuffer { + void evictWhile(final Supplier predicate, final Consumer> callback); + + void put(final long time, final Bytes key, final ContextualRecord value); + + int numRecords(); + + long bufferSize(); + + long minTimestamp(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index dd572649765..cd4657bdcd8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.RecordContext; @@ -78,6 +79,23 @@ public class ProcessorRecordContext implements RecordContext { return headers; } + public long sizeBytes() { + long size = 0L; + size += 8; // value.context.timestamp + size += 8; // value.context.offset + if (topic != null) { + size += topic.toCharArray().length; + } + size += 4; // partition + if (headers != null) { + for (final Header header : headers) { + size += header.key().toCharArray().length; + size += header.value().length; + } + } + return size; + } + @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index c016f640cb2..a6a24ea098f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -89,7 +89,7 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { final ProcessorRecordContext current = context.recordContext(); try { - context.setRecordContext(entry.recordContext()); + context.setRecordContext(entry.entry().context()); if (flushListener != null) { V oldValue = null; if (sendOldValues) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 2da5ab98550..cbcb7490efb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -169,7 +169,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { final Bytes binaryKey = cacheFunction.key(entry.key()); final ProcessorRecordContext current = context.recordContext(); - context.setRecordContext(entry.recordContext()); + context.setRecordContext(entry.entry().context()); try { final Windowed key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic); final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key())); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 688e88962a2..f8d9ad590a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -108,7 +108,7 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl final InternalProcessorContext context) { if (flushListener != null) { final ProcessorRecordContext current = context.recordContext(); - context.setRecordContext(entry.recordContext()); + context.setRecordContext(entry.entry().context()); try { final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null; flushListener.apply(windowedKey, serdes.valueFrom(entry.newValue()), oldValue); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java new file mode 100644 index 00000000000..89935c09d8f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; + +import java.util.Arrays; +import java.util.Objects; + +public class ContextualRecord { + private final byte[] value; + private final ProcessorRecordContext recordContext; + + public ContextualRecord(final byte[] value, final ProcessorRecordContext recordContext) { + this.value = value; + this.recordContext = recordContext; + } + + public ProcessorRecordContext recordContext() { + return recordContext; + } + + public byte[] value() { + return value; + } + + public long sizeBytes() { + return (value == null ? 0 : value.length) + recordContext.sizeBytes(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final ContextualRecord that = (ContextualRecord) o; + return Arrays.equals(value, that.value) && + Objects.equals(recordContext, that.recordContext); + } + + @Override + public int hashCode() { + return Objects.hash(value, recordContext); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java index 0ac0b77dd37..53436358ded 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java @@ -19,18 +19,17 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import java.util.Arrays; import java.util.Objects; /** * A cache entry */ -class LRUCacheEntry extends ProcessorRecordContext { - - private final byte[] value; +class LRUCacheEntry { + private final ContextualRecord record; private final long sizeBytes; private boolean isDirty; + LRUCacheEntry(final byte[] value) { this(value, null, false, -1, -1, -1, ""); } @@ -42,15 +41,16 @@ class LRUCacheEntry extends ProcessorRecordContext { final long timestamp, final int partition, final String topic) { - super(timestamp, offset, partition, topic, headers); - this.value = value; + final ProcessorRecordContext context = new ProcessorRecordContext(timestamp, offset, partition, topic, headers); + + this.record = new ContextualRecord( + value, + context + ); + this.isDirty = isDirty; - this.sizeBytes = (value == null ? 0 : value.length) + - 1 + // isDirty - 8 + // timestamp - 8 + // offset - 4 + // partition - (topic == null ? 0 : topic.length()); + this.sizeBytes = 1 + // isDirty + record.sizeBytes(); } void markClean() { @@ -66,7 +66,11 @@ class LRUCacheEntry extends ProcessorRecordContext { } byte[] value() { - return value; + return record.value(); + } + + public ProcessorRecordContext context() { + return record.recordContext(); } @Override @@ -74,17 +78,13 @@ class LRUCacheEntry extends ProcessorRecordContext { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final LRUCacheEntry that = (LRUCacheEntry) o; - return timestamp() == that.timestamp() && - offset() == that.offset() && - partition() == that.partition() && - Objects.equals(topic(), that.topic()) && - Objects.equals(headers(), that.headers()) && - Arrays.equals(this.value, that.value()) && - this.isDirty == that.isDirty(); + return sizeBytes == that.sizeBytes && + isDirty() == that.isDirty() && + Objects.equals(record, that.record); } @Override public int hashCode() { - return Objects.hash(timestamp(), offset(), topic(), partition(), headers(), value, isDirty); + return Objects.hash(record, sizeBytes, isDirty()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 27270e6b51d..941b5221524 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -332,9 +331,9 @@ public class ThreadCache { static class DirtyEntry { private final Bytes key; private final byte[] newValue; - private final ProcessorRecordContext recordContext; + private final LRUCacheEntry recordContext; - DirtyEntry(final Bytes key, final byte[] newValue, final ProcessorRecordContext recordContext) { + DirtyEntry(final Bytes key, final byte[] newValue, final LRUCacheEntry recordContext) { this.key = key; this.newValue = newValue; this.recordContext = recordContext; @@ -348,7 +347,7 @@ public class ThreadCache { return newValue; } - public ProcessorRecordContext recordContext() { + public LRUCacheEntry entry() { return recordContext; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index af91abaf2b1..a9920e3a6f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -49,7 +49,6 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -75,6 +74,9 @@ import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecord import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; @Category({IntegrationTest.class}) public class SuppressionIntegrationTest { @@ -88,7 +90,6 @@ public class SuppressionIntegrationTest { private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2; private static final long TIMEOUT_MS = 30_000L; - @Ignore @Test public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedException { final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter"; @@ -220,10 +221,9 @@ public class SuppressionIntegrationTest { } } - @Ignore @Test public void shouldSuppressIntermediateEventsWithRecordLimit() throws InterruptedException { - final String testId = "-shouldSuppressIntermediateEventsWithKeyLimit"; + final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; @@ -279,7 +279,46 @@ public class SuppressionIntegrationTest { } } - @Ignore + @Test + public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException { + final String testId = "-shouldShutdownWhenRecordConstraintIsViolated"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable valueCounts = buildCountsTable(input, builder); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).shutDownWhenFull())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + verifyErrorShutdown(driver); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + @Test public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedException { final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit"; @@ -339,7 +378,47 @@ public class SuppressionIntegrationTest { } } - @Ignore + @Test + public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException { + final String testId = "-shouldShutdownWhenBytesConstraintIsViolated"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable valueCounts = buildCountsTable(input, builder); + + valueCounts + // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. + .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + verifyErrorShutdown(driver); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + @Test public void shouldSupportFinalResultsForTimeWindows() throws InterruptedException { final String testId = "-shouldSupportFinalResultsForTimeWindows"; @@ -479,6 +558,11 @@ public class SuppressionIntegrationTest { } } + private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException { + waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down."); + assertThat(driver.state(), is(KafkaStreams.State.ERROR)); + } + private void verifyOutput(final String topic, final List> expected) { final List> results; try { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java index 7650c59759e..fcb5ba8ef3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; -import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.junit.Test; import static java.lang.Long.MAX_VALUE; @@ -61,31 +61,31 @@ public class SuppressedTest { assertThat( "time alone should be set", untilTimeLimit(ofMillis(2), unbounded()), - is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false)) + is(new SuppressedInternal<>(ofMillis(2), unbounded(), null, false)) ); assertThat( "time and unbounded buffer should be set", untilTimeLimit(ofMillis(2), unbounded()), - is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false)) + is(new SuppressedInternal<>(ofMillis(2), unbounded(), null, false)) ); assertThat( "time and keys buffer should be set", untilTimeLimit(ofMillis(2), maxRecords(2)), - is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null, false)) + is(new SuppressedInternal<>(ofMillis(2), maxRecords(2), null, false)) ); assertThat( "time and size buffer should be set", untilTimeLimit(ofMillis(2), maxBytes(2)), - is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null, false)) + is(new SuppressedInternal<>(ofMillis(2), maxBytes(2), null, false)) ); assertThat( "all constraints should be set", untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)), - is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false)) + is(new SuppressedInternal<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false)) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index d98a15e093b..222e1d63982 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; @@ -32,7 +31,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -41,7 +39,6 @@ import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.WindowStore; @@ -60,6 +57,7 @@ import java.util.Properties; import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; @@ -159,7 +157,7 @@ public class SuppressScenarioTest { } } - @Test(expected = ProcessorStateException.class) + @Test public void shouldSuppressIntermediateEventsWithTimeLimit() { final StreamsBuilder builder = new StreamsBuilder(); final KTable valueCounts = builder @@ -198,11 +196,9 @@ public class SuppressScenarioTest { new KeyValueTimestamp<>("v1", 1L, 2L) ) ); - // note that the current stream time is 2, which causes v1 to age out of the buffer, since - // it has been buffered since time 0 (even though the current version of it in the buffer has timestamp 1) verify( drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), - singletonList(new KeyValueTimestamp<>("v1", 0L, 1L)) + singletonList(new KeyValueTimestamp<>("v1", 1L, 2L)) ); // inserting a dummy "tick" record just to advance stream time driver.pipeInput(recordFactory.create("input", "tick", "tick", 3L)); @@ -225,36 +221,15 @@ public class SuppressScenarioTest { new KeyValueTimestamp<>("tick", 1L, 4L) ) ); + // tick is still buffered, since it was first inserted at time 3, and it is only time 4 right now. verify( drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), - singletonList( - new KeyValueTimestamp<>("v1", 1L, 2L) - ) - ); - driver.pipeInput(recordFactory.create("input", "tick", "tick", 5L)); - verify( - drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), - asList( - new KeyValueTimestamp<>("tick", 0L, 5L), - new KeyValueTimestamp<>("tick", 1L, 5L) - ) - ); - // Note that because the punctuate runs before the process call, the tick at time 5 causes - // the previous tick to age out of the buffer, so at this point, we see the prior value emitted - // and the new value is still buffered. - - // Also worth noting is that "tick" ages out because it has been buffered since time 3, even though - // the current timestamp of the buffered record is 4. - verify( - drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), - singletonList( - new KeyValueTimestamp<>("tick", 1L, 4L) - ) + emptyList() ); } } - @Test(expected = ProcessorStateException.class) + @Test public void shouldSuppressIntermediateEventsWithRecordLimit() { final StreamsBuilder builder = new StreamsBuilder(); final KTable valueCounts = builder @@ -320,7 +295,7 @@ public class SuppressScenarioTest { } } - @Test(expected = ProcessorStateException.class) + @Test public void shouldSuppressIntermediateEventsWithBytesLimit() { final StreamsBuilder builder = new StreamsBuilder(); final KTable valueCounts = builder @@ -351,8 +326,7 @@ public class SuppressScenarioTest { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); - final ConsumerRecord consumerRecord = recordFactory.create("input", "k2", "v1", 2L); - driver.pipeInput(consumerRecord); + driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L)); verify( drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), asList( @@ -388,7 +362,7 @@ public class SuppressScenarioTest { } } - @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + @Test public void shouldSupportFinalResultsForTimeWindows() { final StreamsBuilder builder = new StreamsBuilder(); final KTable, Long> valueCounts = builder @@ -442,7 +416,7 @@ public class SuppressScenarioTest { } } - @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + @Test public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() { final StreamsBuilder builder = new StreamsBuilder(); final KTable, Long> valueCounts = builder @@ -501,7 +475,7 @@ public class SuppressScenarioTest { } } - @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + @Test public void shouldSupportFinalResultsForSessionWindows() { final StreamsBuilder builder = new StreamsBuilder(); final KTable, Long> valueCounts = builder diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index a38d1d58f43..bb7f49ce7a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -16,15 +16,20 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedSerializer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.test.MockInternalProcessorContext; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -38,25 +43,23 @@ import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; import static org.apache.kafka.common.serialization.Serdes.Long; import static org.apache.kafka.common.serialization.Serdes.String; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; import static org.apache.kafka.streams.kstream.WindowedSerdes.sessionWindowedSerdeFrom; -import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; @SuppressWarnings("PointlessArithmeticExpression") public class KTableSuppressProcessorTest { private static final long ARBITRARY_LONG = 5L; - private static final long ARBITRARY_TIMESTAMP = 1993L; - private static final Change ARBITRARY_CHANGE = new Change<>(7L, 14L); - private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L); - @Test public void zeroTimeLimitShouldImmediatelyEmit() { final KTableSuppressProcessor processor = @@ -66,7 +69,7 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = ARBITRARY_LONG; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); final String key = "hey"; final Change value = ARBITRARY_CHANGE; @@ -83,7 +86,7 @@ public class KTableSuppressProcessorTest { final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( getImpl(untilTimeLimit(ZERO, unbounded())), - timeWindowedSerdeFrom(String.class), + timeWindowedSerdeFrom(String.class, 100L), new FullChangeSerde<>(Long()) ); @@ -91,9 +94,9 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = ARBITRARY_LONG; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); - final Windowed key = new Windowed<>("hey", ARBITRARY_WINDOW); + final Windowed key = new Windowed<>("hey", new TimeWindow(0L, 100L)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -103,7 +106,7 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } - @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + @Test public void intermediateSuppressionShouldBufferAndEmitLater() { final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( @@ -117,13 +120,15 @@ public class KTableSuppressProcessorTest { final long timestamp = 0L; context.setRecordMetadata("topic", 0, 0, null, timestamp); + context.setStreamTime(timestamp); final String key = "hey"; final Change value = new Change<>(null, 1L); processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); - assertThat(context.scheduledPunctuators(), hasSize(1)); - context.scheduledPunctuators().get(0).getPunctuator().punctuate(1); + context.setRecordMetadata("topic", 0, 1, null, 1L); + context.setStreamTime(1L); + processor.process("tick", new Change<>(null, null)); assertThat(context.forwarded(), hasSize(1)); final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); @@ -131,38 +136,49 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } - - @SuppressWarnings("unchecked") - private SuppressedImpl finalResults(final Duration grace) { - return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace); - } - - - @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + @Test public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( finalResults(ofMillis(1L)), - timeWindowedSerdeFrom(String.class), + timeWindowedSerdeFrom(String.class, 1L), new FullChangeSerde<>(Long()) ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); - final long timestamp = ARBITRARY_TIMESTAMP; - context.setRecordMetadata("topic", 0, 0, null, timestamp); - final Windowed key = new Windowed<>("hey", ARBITRARY_WINDOW); + final long windowStart = 99L; + final long recordTime = 99L; + final long windowEnd = 100L; + context.setRecordMetadata("topic", 0, 0, null, recordTime); + context.setStreamTime(recordTime); + final Windowed key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); - assertThat(context.scheduledPunctuators(), hasSize(1)); - context.scheduledPunctuators().get(0).getPunctuator().punctuate(timestamp + 1L); + // although the stream time is now 100, we have to wait 1 ms after the window *end* before we + // emit "hey", so we don't emit yet. + final long windowStart2 = 100L; + final long recordTime2 = 100L; + final long windowEnd2 = 101L; + context.setRecordMetadata("topic", 0, 1, null, recordTime2); + context.setStreamTime(recordTime2); + processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE); + assertThat(context.forwarded(), hasSize(0)); + + // ok, now it's time to emit "hey" + final long windowStart3 = 101L; + final long recordTime3 = 101L; + final long windowEnd3 = 102L; + context.setRecordMetadata("topic", 0, 1, null, recordTime3); + context.setStreamTime(recordTime3); + processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(1)); final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); - assertThat(capturedForward.timestamp(), is(timestamp)); + assertThat(capturedForward.timestamp(), is(recordTime)); } /** @@ -170,27 +186,32 @@ public class KTableSuppressProcessorTest { * it will still buffer events and emit only after the end of the window. * As opposed to emitting immediately the way regular suppresion would with a time limit of 0. */ - @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + @Test public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( finalResults(ofMillis(0)), - timeWindowedSerdeFrom(String.class), + timeWindowedSerdeFrom(String.class, 100L), new FullChangeSerde<>(Long()) ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); + // note the record is in the past, but the window end is in the future, so we still have to buffer, + // even though the grace period is 0. final long timestamp = 5L; - context.setRecordMetadata("", 0, 0L, null, timestamp); + final long streamTime = 99L; final long windowEnd = 100L; + context.setRecordMetadata("", 0, 0L, null, timestamp); + context.setStreamTime(streamTime); final Windowed key = new Windowed<>("hey", new TimeWindow(0, windowEnd)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); - assertThat(context.scheduledPunctuators(), hasSize(1)); - context.scheduledPunctuators().get(0).getPunctuator().punctuate(windowEnd); + context.setRecordMetadata("", 0, 1L, null, windowEnd); + context.setStreamTime(windowEnd); + processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(1)); final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); @@ -202,7 +223,7 @@ public class KTableSuppressProcessorTest { public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() { final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( finalResults(ofMillis(0)), - timeWindowedSerdeFrom(String.class), + timeWindowedSerdeFrom(String.class, 100L), new FullChangeSerde<>(Long()) ); @@ -210,7 +231,7 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = 100L; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); final Change value = ARBITRARY_CHANGE; @@ -226,7 +247,7 @@ public class KTableSuppressProcessorTest { public void finalResultsShouldSuppressTombstonesForTimeWindows() { final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( finalResults(ofMillis(0)), - timeWindowedSerdeFrom(String.class), + timeWindowedSerdeFrom(String.class, 100L), new FullChangeSerde<>(Long()) ); @@ -234,7 +255,7 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = 100L; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); final Change value = new Change<>(null, ARBITRARY_LONG); @@ -255,7 +276,7 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = 100L; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new SessionWindow(0L, 0L)); final Change value = new Change<>(null, ARBITRARY_LONG); @@ -264,12 +285,11 @@ public class KTableSuppressProcessorTest { assertThat(context.forwarded(), hasSize(0)); } - @SuppressWarnings("unchecked") @Test public void suppressShouldNotSuppressTombstonesForTimeWindows() { - final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor, Long>( - (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)), - timeWindowedSerdeFrom(String.class), + final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))), + timeWindowedSerdeFrom(String.class, 100L), new FullChangeSerde<>(Long()) ); @@ -277,7 +297,7 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = 100L; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0L, 100L)); final Change value = new Change<>(null, ARBITRARY_LONG); @@ -289,11 +309,10 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } - @SuppressWarnings("unchecked") @Test public void suppressShouldNotSuppressTombstonesForSessionWindows() { - final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor, Long>( - (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)), + final KTableSuppressProcessor, Long> processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))), sessionWindowedSerdeFrom(String.class), new FullChangeSerde<>(Long()) ); @@ -302,7 +321,7 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = 100L; - context.setTimestamp(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new SessionWindow(0L, 0L)); final Change value = new Change<>(null, ARBITRARY_LONG); @@ -314,11 +333,61 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } - @SuppressWarnings("unchecked") @Test public void suppressShouldNotSuppressTombstonesForKTable() { - final KTableSuppressProcessor processor = new KTableSuppressProcessor( - (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)), + final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))), + Serdes.String(), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setRecordMetadata("", 0, 0L, null, timestamp); + context.setStreamTime(timestamp); + final String key = "hey"; + final Change value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void suppressShouldEmitWhenOverRecordCapacity() { + final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1))), + Serdes.String(), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setStreamTime(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); + final String key = "hey"; + final Change value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + context.setRecordMetadata("", 0, 1L, null, timestamp + 1); + processor.process("dummyKey", value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void suppressShouldEmitWhenOverByteCapacity() { + final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L))), Serdes.String(), new FullChangeSerde<>(Long()) ); @@ -327,18 +396,82 @@ public class KTableSuppressProcessorTest { processor.init(context); final long timestamp = 100L; - context.setTimestamp(timestamp); context.setStreamTime(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); + context.setRecordMetadata("", 0, 1L, null, timestamp + 1); + processor.process("dummyKey", value); + assertThat(context.forwarded(), hasSize(1)); final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); assertThat(capturedForward.timestamp(), is(timestamp)); } + @Test + public void suppressShouldShutDownWhenOverRecordCapacity() { + final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull())), + Serdes.String(), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setStreamTime(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); + context.setCurrentNode(new ProcessorNode("testNode")); + final String key = "hey"; + final Change value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + context.setRecordMetadata("", 0, 1L, null, timestamp); + try { + processor.process("dummyKey", value); + fail("expected an exception"); + } catch (final StreamsException e) { + assertThat(e.getMessage(), containsString("buffer exceeded its max capacity")); + } + } + + @Test + public void suppressShouldShutDownWhenOverByteCapacity() { + final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull())), + Serdes.String(), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setStreamTime(timestamp); + context.setRecordMetadata("", 0, 0L, null, timestamp); + context.setCurrentNode(new ProcessorNode("testNode")); + final String key = "hey"; + final Change value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + context.setRecordMetadata("", 0, 1L, null, timestamp); + try { + processor.process("dummyKey", value); + fail("expected an exception"); + } catch (final StreamsException e) { + assertThat(e.getMessage(), containsString("buffer exceeded its max capacity")); + } + } + + @SuppressWarnings("unchecked") + private SuppressedInternal finalResults(final Duration grace) { + return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace); + } + private static Matcher> hasSize(final int i) { return new BaseMatcher>() { @Override @@ -359,7 +492,15 @@ public class KTableSuppressProcessorTest { }; } - private static SuppressedImpl getImpl(final Suppressed suppressed) { - return (SuppressedImpl) suppressed; + private static SuppressedInternal getImpl(final Suppressed suppressed) { + return (SuppressedInternal) suppressed; + } + + private Serde> timeWindowedSerdeFrom(final Class rawType, final long windowSize) { + final Serde kSerde = Serdes.serdeFrom(rawType); + return new Serdes.WrapperSerde<>( + new TimeWindowedSerializer<>(kSerde.serializer()), + new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize) + ); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 0fdbdf76b3c..71a6ac21c10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -190,7 +190,7 @@ public class NamedCacheTest { assertEquals(2, flushed.size()); assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key()); - assertEquals(headers, flushed.get(0).recordContext().headers()); + assertEquals(headers, flushed.get(0).entry().context().headers()); assertArrayEquals(new byte[] {10}, flushed.get(0).newValue()); assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key()); assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());