diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 8e6f990f0b3..d410bce3c6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -64,12 +64,12 @@ class GroupedStreamAggregateBuilder { this.userName = groupedInternal.name(); } - KTable build(final String functionName, - final StoreBuilder storeBuilder, - final KStreamAggProcessorSupplier aggregateSupplier, - final boolean isQueryable, - final Serde keySerde, - final Serde valSerde) { + KTable build(final String functionName, + final StoreBuilder storeBuilder, + final KStreamAggProcessorSupplier aggregateSupplier, + final boolean isQueryable, + final Serde keySerde, + final Serde valSerde) { final String aggFunctionName = builder.newProcessorName(functionName); @@ -84,17 +84,14 @@ class GroupedStreamAggregateBuilder { builder.addGraphNode(parentNode, repartitionNode); parentNode = repartitionNode; } - final StatefulProcessorNode.StatefulProcessorNodeBuilder statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); - final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); - - statefulProcessorNodeBuilder - .withProcessorParameters(processorParameters) - .withNodeName(aggFunctionName) - .withRepartitionRequired(repartitionRequired) - .withStoreBuilder(storeBuilder); - - final StatefulProcessorNode statefulProcessorNode = statefulProcessorNodeBuilder.build(); + final StatefulProcessorNode statefulProcessorNode = + new StatefulProcessorNode<>( + aggFunctionName, + new ProcessorParameters<>(aggregateSupplier, aggFunctionName), + storeBuilder, + repartitionRequired + ); builder.addGraphNode(parentNode, statefulProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 013028dc3ae..29a52b1b37a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -79,9 +79,12 @@ public class KGroupedTableImpl extends AbstractStream implements KGr // the passed in StreamsGraphNode must be the parent of the repartition node builder.addGraphNode(this.streamsGraphNode, repartitionNode); - final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(materialized, - funcName, - aggregateSupplier); + final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( + funcName, + new ProcessorParameters<>(aggregateSupplier, funcName), + new KeyValueStoreMaterializer<>(materialized).materialize(), + false + ); // now the repartition node must be the parent of the StateProcessorNode builder.addGraphNode(repartitionNode, statefulProcessorNode); @@ -98,20 +101,6 @@ public class KGroupedTableImpl extends AbstractStream implements KGr builder); } - private StatefulProcessorNode> getStatefulProcessorNode(final MaterializedInternal> materialized, - final String functionName, - final ProcessorSupplier> aggregateSupplier) { - - final ProcessorParameters> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName); - - return StatefulProcessorNode - .>statefulProcessorNodeBuilder() - .withNodeName(functionName) - .withProcessorParameters(aggregateFunctionProcessorParams) - .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()) - .build(); - } - private GroupedTableOperationRepartitionNode createRepartitionNode(final String sinkName, final String sourceName, final String topic) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 49dbbd1a905..f1a875479a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import java.time.Duration; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.ForeachAction; @@ -56,6 +55,7 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import java.lang.reflect.Array; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -456,7 +456,6 @@ public class KStreamImpl extends AbstractStream implements KStream(new KStreamTransform<>(transformerSupplier), name), stateStoreNames, - null, true ); @@ -491,7 +490,6 @@ public class KStreamImpl extends AbstractStream implements KStream(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name), stateStoreNames, - null, repartitionRequired ); @@ -513,7 +511,6 @@ public class KStreamImpl extends AbstractStream implements KStream(processorSupplier, name), stateStoreNames, - null, repartitionRequired ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 45f55526571..f49d109c0fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -377,7 +377,6 @@ public class KTableImpl extends AbstractStream implements KTable< final ProcessorGraphNode> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - null, new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName), false ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java index 2dc6aadb41d..fe11b268837 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java @@ -31,25 +31,44 @@ public class StatefulProcessorNode extends ProcessorGraphNode { private final StoreBuilder storeBuilder; + /** + * Create a node representing a stateful procssor, where the named store has already been registered. + */ public StatefulProcessorNode(final String nodeName, final ProcessorParameters processorParameters, final String[] storeNames, - final StoreBuilder materializedKTableStoreBuilder, final boolean repartitionRequired) { super(nodeName, - processorParameters, - repartitionRequired); + processorParameters, + repartitionRequired); this.storeNames = storeNames; + this.storeBuilder = null; + } + + + /** + * Create a node representing a stateful procssor, + * where the store needs to be built and registered as part of building this node. + */ + public StatefulProcessorNode(final String nodeName, + final ProcessorParameters processorParameters, + final StoreBuilder materializedKTableStoreBuilder, + final boolean repartitionRequired) { + super(nodeName, + processorParameters, + repartitionRequired); + + this.storeNames = null; this.storeBuilder = materializedKTableStoreBuilder; } @Override public String toString() { return "StatefulProcessorNode{" + - "storeNames=" + Arrays.toString(storeNames) + - ", storeBuilder=" + storeBuilder + - "} " + super.toString(); + "storeNames=" + Arrays.toString(storeNames) + + ", storeBuilder=" + storeBuilder + + "} " + super.toString(); } @Override @@ -68,56 +87,4 @@ public class StatefulProcessorNode extends ProcessorGraphNode { topologyBuilder.addStateStore(storeBuilder, processorName); } } - - public static StatefulProcessorNodeBuilder statefulProcessorNodeBuilder() { - return new StatefulProcessorNodeBuilder<>(); - } - - public static final class StatefulProcessorNodeBuilder { - - private ProcessorParameters processorSupplier; - private String nodeName; - private boolean repartitionRequired; - private String[] storeNames; - private StoreBuilder storeBuilder; - - private StatefulProcessorNodeBuilder() { - } - - public StatefulProcessorNodeBuilder withProcessorParameters(final ProcessorParameters processorParameters) { - this.processorSupplier = processorParameters; - return this; - } - - public StatefulProcessorNodeBuilder withNodeName(final String nodeName) { - this.nodeName = nodeName; - return this; - } - - public StatefulProcessorNodeBuilder withStoreNames(final String[] storeNames) { - this.storeNames = storeNames; - return this; - } - - public StatefulProcessorNodeBuilder withRepartitionRequired(final boolean repartitionRequired) { - this.repartitionRequired = repartitionRequired; - return this; - } - - public StatefulProcessorNodeBuilder withStoreBuilder(final StoreBuilder storeBuilder) { - this.storeBuilder = storeBuilder; - return this; - } - - public StatefulProcessorNode build() { - return new StatefulProcessorNode<>( - nodeName, - processorSupplier, - storeNames, - storeBuilder, - repartitionRequired - ); - - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 20ce3ffa31e..5e426d95931 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.junit.Test; import static java.time.Duration.ofMillis; @@ -61,8 +63,7 @@ public class GraphGraceSearchUtilTest { }, "dummy" ), - null, - null, + (StoreBuilder) null, false ); @@ -91,8 +92,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - null, - null, + (StoreBuilder) null, false ); @@ -116,8 +116,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - null, - null, + (StoreBuilder) null, false ); @@ -133,8 +132,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>(new KStreamSessionWindowAggregate( windows, "asdf", null, null, null ), "asdf"), - null, - null, + (StoreBuilder) null, false ); @@ -153,8 +151,7 @@ public class GraphGraceSearchUtilTest { }, "dummy" ), - null, - null, + (StoreBuilder) null, false ); graceGrandparent.addChild(statefulParent); @@ -181,8 +178,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - null, - null, + (StoreBuilder) null, false ); @@ -210,8 +206,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - null, - null, + (StoreBuilder) null, false ); @@ -226,8 +221,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - null, - null, + (StoreBuilder) null, false );