Browse Source

MINOR: Clarify usage of stateful processor node (#5740)

In recent PRs, we have been confused about the proper usage of
StatefulProcessorNode (#5731 , #5737 )

This change disambiguates it.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/5707/merge
John Roesler 6 years ago committed by Guozhang Wang
parent
commit
1bc620d1af
  1. 23
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
  2. 21
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
  3. 5
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  4. 1
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  5. 73
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
  6. 26
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java

23
streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java

@ -64,12 +64,12 @@ class GroupedStreamAggregateBuilder<K, V> { @@ -64,12 +64,12 @@ class GroupedStreamAggregateBuilder<K, V> {
this.userName = groupedInternal.name();
}
<KR, T> KTable<KR, T> build(final String functionName,
<KR, VR> KTable<KR, VR> build(final String functionName,
final StoreBuilder<? extends StateStore> storeBuilder,
final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
final boolean isQueryable,
final Serde<KR> keySerde,
final Serde<T> valSerde) {
final Serde<VR> valSerde) {
final String aggFunctionName = builder.newProcessorName(functionName);
@ -84,17 +84,14 @@ class GroupedStreamAggregateBuilder<K, V> { @@ -84,17 +84,14 @@ class GroupedStreamAggregateBuilder<K, V> {
builder.addGraphNode(parentNode, repartitionNode);
parentNode = repartitionNode;
}
final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
statefulProcessorNodeBuilder
.withProcessorParameters(processorParameters)
.withNodeName(aggFunctionName)
.withRepartitionRequired(repartitionRequired)
.withStoreBuilder(storeBuilder);
final StatefulProcessorNode<K, V> statefulProcessorNode = statefulProcessorNodeBuilder.build();
final StatefulProcessorNode<K, V> statefulProcessorNode =
new StatefulProcessorNode<>(
aggFunctionName,
new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
storeBuilder,
repartitionRequired
);
builder.addGraphNode(parentNode, statefulProcessorNode);

21
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java

@ -79,9 +79,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr @@ -79,9 +79,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
// the passed in StreamsGraphNode must be the parent of the repartition node
builder.addGraphNode(this.streamsGraphNode, repartitionNode);
final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(materialized,
final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
funcName,
aggregateSupplier);
new ProcessorParameters<>(aggregateSupplier, funcName),
new KeyValueStoreMaterializer<>(materialized).materialize(),
false
);
// now the repartition node must be the parent of the StateProcessorNode
builder.addGraphNode(repartitionNode, statefulProcessorNode);
@ -98,20 +101,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr @@ -98,20 +101,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
builder);
}
private <T> StatefulProcessorNode<K, Change<V>> getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
final String functionName,
final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
return StatefulProcessorNode
.<K, Change<V>>statefulProcessorNodeBuilder()
.withNodeName(functionName)
.withProcessorParameters(aggregateFunctionProcessorParams)
.withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize())
.build();
}
private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
final String sourceName,
final String topic) {

5
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -56,6 +55,7 @@ import org.apache.kafka.streams.state.Stores; @@ -56,6 +55,7 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.lang.reflect.Array;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -456,7 +456,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -456,7 +456,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
name,
new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name),
stateStoreNames,
null,
true
);
@ -491,7 +490,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -491,7 +490,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
name,
new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
stateStoreNames,
null,
repartitionRequired
);
@ -513,7 +511,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -513,7 +511,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames,
null,
repartitionRequired
);

1
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -377,7 +377,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -377,7 +377,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
null,
new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
false
);

73
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java

@ -31,16 +31,35 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { @@ -31,16 +31,35 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private final StoreBuilder<? extends StateStore> storeBuilder;
/**
* Create a node representing a stateful procssor, where the named store has already been registered.
*/
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
final String[] storeNames,
final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
final boolean repartitionRequired) {
super(nodeName,
processorParameters,
repartitionRequired);
this.storeNames = storeNames;
this.storeBuilder = null;
}
/**
* Create a node representing a stateful procssor,
* where the store needs to be built and registered as part of building this node.
*/
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
final boolean repartitionRequired) {
super(nodeName,
processorParameters,
repartitionRequired);
this.storeNames = null;
this.storeBuilder = materializedKTableStoreBuilder;
}
@ -68,56 +87,4 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { @@ -68,56 +87,4 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
public static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
return new StatefulProcessorNodeBuilder<>();
}
public static final class StatefulProcessorNodeBuilder<K, V> {
private ProcessorParameters<K, V> processorSupplier;
private String nodeName;
private boolean repartitionRequired;
private String[] storeNames;
private StoreBuilder<? extends StateStore> storeBuilder;
private StatefulProcessorNodeBuilder() {
}
public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
this.processorSupplier = processorParameters;
return this;
}
public StatefulProcessorNodeBuilder<K, V> withNodeName(final String nodeName) {
this.nodeName = nodeName;
return this;
}
public StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] storeNames) {
this.storeNames = storeNames;
return this;
}
public StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final boolean repartitionRequired) {
this.repartitionRequired = repartitionRequired;
return this;
}
public StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<? extends StateStore> storeBuilder) {
this.storeBuilder = storeBuilder;
return this;
}
public StatefulProcessorNode<K, V> build() {
return new StatefulProcessorNode<>(
nodeName,
processorSupplier,
storeNames,
storeBuilder,
repartitionRequired
);
}
}
}

26
streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java

@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; @@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.Test;
import static java.time.Duration.ofMillis;
@ -61,8 +63,7 @@ public class GraphGraceSearchUtilTest { @@ -61,8 +63,7 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
@ -91,8 +92,7 @@ public class GraphGraceSearchUtilTest { @@ -91,8 +92,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
@ -116,8 +116,7 @@ public class GraphGraceSearchUtilTest { @@ -116,8 +116,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
@ -133,8 +132,7 @@ public class GraphGraceSearchUtilTest { @@ -133,8 +132,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, "asdf", null, null, null
), "asdf"),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
@ -153,8 +151,7 @@ public class GraphGraceSearchUtilTest { @@ -153,8 +151,7 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
graceGrandparent.addChild(statefulParent);
@ -181,8 +178,7 @@ public class GraphGraceSearchUtilTest { @@ -181,8 +178,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
@ -210,8 +206,7 @@ public class GraphGraceSearchUtilTest { @@ -210,8 +206,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);
@ -226,8 +221,7 @@ public class GraphGraceSearchUtilTest { @@ -226,8 +221,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
null,
null,
(StoreBuilder<? extends StateStore>) null,
false
);

Loading…
Cancel
Save