Browse Source

KAFKA-7502: Cleanup KTable materialization logic in a single place (#6174)

This is a draft cleanup for KAFKA-7502. Here is the details:

* Make KTableKTableJoinNode abstract, and define its child classes ([NonMaterialized,Materialized]KTableKTableJoinNode) instead: now, all materialization-related routines are separated into the other classes.

* KTableKTableJoinNodeBuilder#build now instantiates [NonMaterialized,Materialized]KTableKTableJoinNode classes instead of KTableKTableJoinNode.

Reviewers: Guozhang Wang <wangguoz@gmail.com>,  Bill Bejeck <bbejeck@gmail.com>
pull/6449/head
Lee Dongjin 6 years ago committed by Guozhang Wang
parent
commit
4ca8e40e2f
  1. 106
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  2. 17
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
  3. 172
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java

106
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -43,6 +43,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed; @@ -43,6 +43,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -465,28 +466,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -465,28 +466,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final boolean rightOuter) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
final String joinMergeName = builder.newProcessorName(MERGE_NAME);
return buildJoin(
(AbstractStream<K, VO>) other,
joiner,
leftOuter,
rightOuter,
joinMergeName,
internalQueryableName,
materializedInternal
);
}
private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final boolean leftOuter,
final boolean rightOuter,
final String joinMergeName,
final String internalQueryableName,
final MaterializedInternal<K, R, KeyValueStore<Bytes, byte[]>> materializedInternal) {
final Set<String> allSourceNodes = ensureJoinableWith(other);
final String joinMergeName = builder.newProcessorName(MERGE_NAME);
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
if (leftOuter) {
enableSendingOldValues();
@ -495,57 +477,67 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -495,57 +477,67 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
((KTableImpl) other).enableSendingOldValues();
}
final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
final KTableKTableAbstractJoin<K, VR, V, VO> joinThis;
final KTableKTableAbstractJoin<K, VR, VO, V> joinOther;
if (!leftOuter) { // inner
joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
} else if (!rightOuter) { // left
joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
} else { // outer
joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
}
final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(joinThis, joinOther, internalQueryableName);
final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
final ProcessorParameters<K, Change<VO>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
final KTableKTableJoinNode.KTableKTableJoinNodeBuilder<K, V, V1, R> kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
// only materialize if specified in Materialized
if (materializedInternal != null) {
kTableJoinNodeBuilder.withMaterializedInternal(materializedInternal);
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.storeName();
storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
} else {
keySerde = this.keySerde;
valueSerde = null;
queryableStoreName = null;
storeBuilder = null;
}
kTableJoinNodeBuilder.withNodeName(joinMergeName);
final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
final ProcessorParameters<K, Change<V1>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
final ProcessorParameters<K, Change<R>> joinMergeProcessorParameters = new ProcessorParameters<>(joinMerge, joinMergeName);
kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
.withJoinThisProcessorParameters(joinThisProcessorParameters)
.withJoinThisStoreNames(valueGetterSupplier().storeNames())
.withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
.withOtherJoinSideNodeName(((KTableImpl) other).name)
.withThisJoinSideNodeName(name);
final KTableKTableJoinNode<K, V, V1, R> kTableKTableJoinNode = kTableJoinNodeBuilder.build();
final KTableKTableJoinNode<K, V, VO, VR> kTableKTableJoinNode =
KTableKTableJoinNode.<K, V, VO, VR>kTableKTableJoinNodeBuilder()
.withNodeName(joinMergeName)
.withJoinThisProcessorParameters(joinThisProcessorParameters)
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
.withThisJoinSideNodeName(name)
.withOtherJoinSideNodeName(((KTableImpl) other).name)
.withJoinThisStoreNames(valueGetterSupplier().storeNames())
.withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
.withQueryableStoreName(queryableStoreName)
.withStoreBuilder(storeBuilder)
.build();
builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
// we can inherit parent key serde if user do not provide specific overrides
return new KTableImpl<K, Change<R>, R>(
joinMergeName,
materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
materializedInternal != null ? materializedInternal.valueSerde() : null,
return new KTableImpl<K, Change<VR>, VR>(
kTableKTableJoinNode.nodeName(),
kTableKTableJoinNode.keySerde(),
kTableKTableJoinNode.valueSerde(),
allSourceNodes,
internalQueryableName,
joinMerge,
kTableKTableJoinNode.queryableStoreName(),
kTableKTableJoinNode.joinMerger(),
kTableKTableJoinNode,
builder
);

17
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java

@ -25,7 +25,7 @@ import java.util.Collections; @@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableProcessorSupplier<K, ?, V> parent1;
private final KTableProcessorSupplier<K, ?, V> parent2;
@ -40,6 +40,10 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -40,6 +40,10 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
this.queryableName = queryableName;
}
public String getQueryableName() {
return queryableName;
}
@Override
public Processor<K, Change<V>> get() {
return new KTableKTableJoinMergeProcessor();
@ -78,6 +82,17 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -78,6 +82,17 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
sendOldValues = true;
}
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
final KTableProcessorSupplier<K, ?, V> parent2) {
return of(parent1, parent2, null);
}
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
final KTableProcessorSupplier<K, ?, V> parent2,
final String queryableName) {
return new KTableKTableJoinMerger<>(parent1, parent2, queryableName);
}
private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
private KeyValueStore<K, V> store;
private TupleForwarder<K, V> tupleForwarder;

172
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java

@ -17,11 +17,10 @@ @@ -17,11 +17,10 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@ -33,32 +32,64 @@ import java.util.Arrays; @@ -33,32 +32,64 @@ import java.util.Arrays;
*/
public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, Change<V1>, Change<V2>, Change<VR>> {
private final Serde<K> keySerde;
private final Serde<VR> valueSerde;
private final String[] joinThisStoreNames;
private final String[] joinOtherStoreNames;
private final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal;
private final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
KTableKTableJoinNode(final String nodeName,
final ValueJoiner<? super Change<V1>, ? super Change<V2>, ? extends Change<VR>> valueJoiner,
final ProcessorParameters<K, Change<V1>> joinThisProcessorParameters,
final ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters,
final ProcessorParameters<K, Change<VR>> joinMergeProcessorParameters,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final String thisJoinSide,
final String otherJoinSide,
final Serde<K> keySerde,
final Serde<VR> valueSerde,
final String[] joinThisStoreNames,
final String[] joinOtherStoreNames) {
final String[] joinOtherStoreNames,
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
super(nodeName,
valueJoiner,
joinThisProcessorParameters,
joinOtherProcessorParameters,
joinMergeProcessorParameters,
thisJoinSide,
otherJoinSide);
null,
joinThisProcessorParameters,
joinOtherProcessorParameters,
joinMergeProcessorParameters,
thisJoinSide,
otherJoinSide);
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.joinThisStoreNames = joinThisStoreNames;
this.joinOtherStoreNames = joinOtherStoreNames;
this.materializedInternal = materializedInternal;
this.storeBuilder = storeBuilder;
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<VR> valueSerde() {
return valueSerde;
}
public String[] joinThisStoreNames() {
return joinThisStoreNames;
}
public String[] joinOtherStoreNames() {
return joinOtherStoreNames;
}
public String queryableStoreName() {
return ((KTableKTableJoinMerger) mergeProcessorParameters().processorSupplier()).getQueryableName();
}
/**
* The supplier which provides processor with KTable-KTable join merge functionality.
*/
public KTableKTableJoinMerger<K, VR> joinMerger() {
return (KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier();
}
@Override
@ -68,26 +99,24 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -68,26 +99,24 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
final String mergeProcessorName = mergeProcessorParameters().processorName();
topologyBuilder.addProcessor(thisProcessorName,
thisProcessorParameters().processorSupplier(),
thisJoinSideNodeName());
thisProcessorParameters().processorSupplier(),
thisJoinSideNodeName());
topologyBuilder.addProcessor(otherProcessorName,
otherProcessorParameters().processorSupplier(),
otherJoinSideNodeName());
otherProcessorParameters().processorSupplier(),
otherJoinSideNodeName());
topologyBuilder.addProcessor(mergeProcessorName,
mergeProcessorParameters().processorSupplier(),
thisProcessorName,
otherProcessorName);
mergeProcessorParameters().processorSupplier(),
thisProcessorName,
otherProcessorName);
topologyBuilder.connectProcessorAndStateStores(thisProcessorName,
joinOtherStoreNames);
joinOtherStoreNames);
topologyBuilder.connectProcessorAndStateStores(otherProcessorName,
joinThisStoreNames);
joinThisStoreNames);
if (materializedInternal != null) {
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder =
new KeyValueStoreMaterializer<>(materializedInternal).materialize();
if (storeBuilder != null) {
topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
}
}
@ -95,10 +124,9 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -95,10 +124,9 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
@Override
public String toString() {
return "KTableKTableJoinNode{" +
"joinThisStoreNames=" + Arrays.toString(joinThisStoreNames) +
", joinOtherStoreNames=" + Arrays.toString(joinOtherStoreNames) +
", materializedInternal=" + materializedInternal +
"} " + super.toString();
"joinThisStoreNames=" + Arrays.toString(joinThisStoreNames()) +
", joinOtherStoreNames=" + Arrays.toString(joinOtherStoreNames()) +
"} " + super.toString();
}
public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
@ -106,23 +134,23 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -106,23 +134,23 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
}
public static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
private String nodeName;
private String[] joinThisStoreNames;
private ProcessorParameters<K, Change<V1>> joinThisProcessorParameters;
private String[] joinOtherStoreNames;
private MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal;
private ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters;
private ProcessorParameters<K, Change<VR>> joinMergeProcessorParameters;
private ValueJoiner<? super Change<V1>, ? super Change<V2>, ? extends Change<VR>> valueJoiner;
private String thisJoinSide;
private String otherJoinSide;
private Serde<K> keySerde;
private Serde<VR> valueSerde;
private String[] joinThisStoreNames;
private String[] joinOtherStoreNames;
private String queryableStoreName;
private StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
private KTableKTableJoinNodeBuilder() {
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisStoreNames(final String[] joinThisStoreNames) {
this.joinThisStoreNames = joinThisStoreNames;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withNodeName(final String nodeName) {
this.nodeName = nodeName;
return this;
}
@ -131,59 +159,69 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -131,59 +159,69 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withNodeName(final String nodeName) {
this.nodeName = nodeName;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters) {
this.joinOtherProcessorParameters = joinOtherProcessorParameters;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
this.joinOtherStoreNames = joinOtherStoreNames;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withThisJoinSideNodeName(final String thisJoinSide) {
this.thisJoinSide = thisJoinSide;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters) {
this.joinOtherProcessorParameters = joinOtherProcessorParameters;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withOtherJoinSideNodeName(final String otherJoinSide) {
this.otherJoinSide = otherJoinSide;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, Change<VR>> joinMergeProcessorParameters) {
this.joinMergeProcessorParameters = joinMergeProcessorParameters;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withKeySerde(final Serde<K> keySerde) {
this.keySerde = keySerde;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final ValueJoiner<? super Change<V1>, ? super Change<V2>, ? extends Change<VR>> valueJoiner) {
this.valueJoiner = valueJoiner;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withValueSerde(final Serde<VR> valueSerde) {
this.valueSerde = valueSerde;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withThisJoinSideNodeName(final String thisJoinSide) {
this.thisJoinSide = thisJoinSide;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisStoreNames(final String[] joinThisStoreNames) {
this.joinThisStoreNames = joinThisStoreNames;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withOtherJoinSideNodeName(final String otherJoinSide) {
this.otherJoinSide = otherJoinSide;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
this.joinOtherStoreNames = joinOtherStoreNames;
return this;
}
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withMaterializedInternal(
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
this.materializedInternal = materializedInternal;
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withQueryableStoreName(final String queryableStoreName) {
this.queryableStoreName = queryableStoreName;
return this;
}
public KTableKTableJoinNode<K, V1, V2, VR> build() {
public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
this.storeBuilder = storeBuilder;
return this;
}
@SuppressWarnings("unchecked")
public KTableKTableJoinNode<K, V1, V2, VR> build() {
return new KTableKTableJoinNode<>(nodeName,
valueJoiner,
joinThisProcessorParameters,
joinOtherProcessorParameters,
joinMergeProcessorParameters,
materializedInternal,
thisJoinSide,
otherJoinSide,
joinThisStoreNames,
joinOtherStoreNames);
joinThisProcessorParameters,
joinOtherProcessorParameters,
new ProcessorParameters<>(
KTableKTableJoinMerger.of(
(KTableProcessorSupplier<K, V1, VR>) (joinThisProcessorParameters.processorSupplier()),
(KTableProcessorSupplier<K, V2, VR>) (joinOtherProcessorParameters.processorSupplier()),
queryableStoreName),
nodeName),
thisJoinSide,
otherJoinSide,
keySerde,
valueSerde,
joinThisStoreNames,
joinOtherStoreNames,
storeBuilder);
}
}
}

Loading…
Cancel
Save