Browse Source

KAFKA-14209 : Rewrite self joins to use single state store 2/3 (#12644)

Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D

Reviewers: Guozhang Wang <guozhang@apache.org>,  Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org>
pull/12720/head
Vicky Papavasileiou 2 years ago committed by GitHub
parent
commit
21a15c6b1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  2. 49
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
  3. 16
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
  4. 23
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
  5. 142
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
  6. 60
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
  7. 53
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
  8. 38
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/WindowedStreamProcessorNode.java
  9. 15
      streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
  10. 394
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
  11. 337
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoinTest.java

15
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -239,7 +239,9 @@ public class StreamsConfig extends AbstractConfig { @@ -239,7 +239,9 @@ public class StreamsConfig extends AbstractConfig {
private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+ " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+ "or a comma separated list of specific optimizations: "
+ "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\").";
+ "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + "
+ "\"SINGLE_STORE_SELF_JOIN+\").";
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
@ -270,8 +272,16 @@ public class StreamsConfig extends AbstractConfig { @@ -270,8 +272,16 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String MERGE_REPARTITION_TOPICS = "merge.repartition.topics";
/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
* for enabling the optimization that optimizes inner stream-stream joins into self-joins when
* both arguments are the same stream.
*/
public static final String SINGLE_STORE_SELF_JOIN = "single.store.self.join";
private static final List<String> TOPOLOGY_OPTIMIZATION_CONFIGS = Arrays.asList(
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS);
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS,
SINGLE_STORE_SELF_JOIN);
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
@ -1698,6 +1708,7 @@ public class StreamsConfig extends AbstractConfig { @@ -1698,6 +1708,7 @@ public class StreamsConfig extends AbstractConfig {
if (configs.contains(OPTIMIZE)) {
verifiedConfigs.add(REUSE_KTABLE_SOURCE_TOPICS);
verifiedConfigs.add(MERGE_REPARTITION_TOPICS);
verifiedConfigs.add(SINGLE_STORE_SELF_JOIN);
} else if (!configs.contains(NO_OPTIMIZATION)) {
verifiedConfigs.addAll(configs);
}

49
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.IdentityHashMap;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.Serde;
@ -33,7 +34,9 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; @@ -33,7 +34,9 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@ -322,6 +325,10 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -322,6 +325,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
mergeRepartitionTopics();
}
if (optimizationConfigs.contains(StreamsConfig.SINGLE_STORE_SELF_JOIN)) {
LOG.debug("Optimizing the Kafka Streams graph for self-joins");
rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
}
}
private void mergeDuplicateSourceNodes() {
@ -375,6 +382,48 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -375,6 +382,48 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
}
/**
* The self-join rewriting can be applied if the StreamStreamJoinNode has a single parent.
* If the join is a self-join, remove the node KStreamJoinWindow corresponding to the
* right argument of the join (the "other"). The join node may have multiple siblings but for
* this rewriting we only care about the ThisKStreamJoinWindow and the OtherKStreamJoinWindow.
* We iterate over all the siblings to identify these two nodes so that we can remove the
* latter.
*/
@SuppressWarnings("unchecked")
private void rewriteSingleStoreSelfJoin(
final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
visited.put(currentNode, true);
if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) {
final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode;
// Remove JoinOtherWindowed node
final GraphNode parent = joinNode.parentNodes().stream().findFirst().get();
GraphNode left = null, right = null;
for (final GraphNode child: parent.children()) {
if (child instanceof WindowedStreamProcessorNode && child.buildPriority() < joinNode.buildPriority()) {
if (child.nodeName().equals(joinNode.getThisWindowedStreamProcessorParameters().processorName())) {
left = child;
} else if (child.nodeName().equals(joinNode.getOtherWindowedStreamProcessorParameters().processorName())) {
right = child;
}
}
}
// Sanity check
if (left != null && right != null && left.buildPriority() < right.buildPriority()) {
parent.removeChild(right);
joinNode.setSelfJoin();
} else {
throw new IllegalStateException(String.format("Expected the left node %s to have smaller build priority than the right node %s.", left, right));
}
}
for (final GraphNode child: currentNode.children()) {
if (!visited.containsKey(child)) {
rewriteSingleStoreSelfJoin(child, visited);
}
}
}
private void reuseKTableSourceTopics() {
LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>) node).reuseSourceTopicForChangeLog(true));

16
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java

@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode; @@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.StoreBuilder;
@ -143,13 +144,13 @@ class KStreamImplJoin { @@ -143,13 +144,13 @@ class KStreamImplJoin {
final KStreamJoinWindow<K, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
final ProcessorParameters<K, V1, ?, ?> thisWindowStreamProcessorParams = new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName);
final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamProcessorName, thisWindowStreamProcessorParams);
final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new WindowedStreamProcessorNode<>(thisWindowStore.name(), thisWindowStreamProcessorParams);
builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
final KStreamJoinWindow<K, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name());
final ProcessorParameters<K, V2, ?, ?> otherWindowStreamProcessorParams = new ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName);
final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new WindowedStreamProcessorNode<>(otherWindowStore.name(), otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
@ -181,6 +182,13 @@ class KStreamImplJoin { @@ -181,6 +182,13 @@ class KStreamImplJoin {
sharedTimeTracker
);
final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
thisWindowStore.name(),
internalWindows,
joiner,
sharedTimeTracker
);
final PassThrough<K, VOut> joinMerge = new PassThrough<>();
final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, VOut> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
@ -188,6 +196,7 @@ class KStreamImplJoin { @@ -188,6 +196,7 @@ class KStreamImplJoin {
final ProcessorParameters<K, V1, ?, ?> joinThisProcessorParams = new ProcessorParameters<>(joinThis, joinThisName);
final ProcessorParameters<K, V2, ?, ?> joinOtherProcessorParams = new ProcessorParameters<>(joinOther, joinOtherName);
final ProcessorParameters<K, VOut, ?, ?> joinMergeProcessorParams = new ProcessorParameters<>(joinMerge, joinMergeName);
final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParams = new ProcessorParameters<>(selfJoin, joinMergeName);
joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
.withJoinThisProcessorParameters(joinThisProcessorParams)
@ -198,7 +207,8 @@ class KStreamImplJoin { @@ -198,7 +207,8 @@ class KStreamImplJoin {
.withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
.withOuterJoinWindowStoreBuilder(outerJoinWindowStore)
.withValueJoiner(joiner)
.withNodeName(joinMergeName);
.withNodeName(joinMergeName)
.withSelfJoinProcessorParameters(selfJoinProcessorParams);
if (internalWindows.spuriousResultFixEnabled()) {
joinBuilder.withSpuriousResultFixEnabled();

23
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java

@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.api.Processor; @@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -122,26 +121,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, @@ -122,26 +121,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
//
// we also ignore the record if value is null, because in a key-value data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (record.key() == null || record.value() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null key or value. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null key or value. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
return;
}
@ -157,7 +137,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, @@ -157,7 +137,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
}
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;

142
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java

@ -0,0 +1,142 @@ @@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
private final String windowName;
private final long joinThisBeforeMs;
private final long joinThisAfterMs;
private final long joinOtherBeforeMs;
private final long joinOtherAfterMs;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
private final TimeTracker sharedTimeTracker;
KStreamKStreamSelfJoin(
final String windowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
final TimeTracker sharedTimeTracker) {
this.windowName = windowName;
this.joinThisBeforeMs = windows.beforeMs;
this.joinThisAfterMs = windows.afterMs;
this.joinOtherBeforeMs = windows.afterMs;
this.joinOtherAfterMs = windows.beforeMs;
this.joinerThis = joinerThis;
this.sharedTimeTracker = sharedTimeTracker;
}
@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKStreamSelfJoinProcessor();
}
private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
private WindowStore<K, V2> windowStore;
private Sensor droppedRecordsSensor;
@Override
public void init(final ProcessorContext<K, VOut> context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
windowStore = context.getStateStore(windowName);
}
@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {
if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
return;
}
final long inputRecordTimestamp = record.timestamp();
long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs);
long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
boolean emittedJoinWithSelf = false;
final Record selfRecord = record
.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
.withTimestamp(inputRecordTimestamp);
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
// Join current record with other
try (final WindowStoreIterator<V2> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;
// Join this with other
context().forward(
record.withValue(joinerThis.apply(
record.key(), record.value(), otherRecord.value))
.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
}
}
// Needs to be in a different loop to ensure correct ordering of records where
// correct ordering means it matches the output of an inner join.
timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);
timeTo = Math.max(0L, inputRecordTimestamp + joinOtherAfterMs);
try (final WindowStoreIterator<V2> iter2 = windowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter2.hasNext()) {
final KeyValue<Long, V2> otherRecord = iter2.next();
final long otherRecordTimestamp = otherRecord.key;
final long maxRecordTimestamp = Math.max(inputRecordTimestamp, otherRecordTimestamp);
// This is needed so that output records follow timestamp order
// Join this with self
if (inputRecordTimestamp < maxRecordTimestamp && !emittedJoinWithSelf) {
emittedJoinWithSelf = true;
context().forward(selfRecord);
}
// Join other with current record
context().forward(
record
.withValue(joinerThis.apply(record.key(), (V1) otherRecord.value, (V2) record.value()))
.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
}
}
// Join this with self
if (!emittedJoinWithSelf) {
context().forward(selfRecord);
}
}
}
}

60
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java

@ -0,0 +1,60 @@ @@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.slf4j.Logger;
public final class StreamStreamJoinUtil {
private StreamStreamJoinUtil(){
}
public static <KIn, VIn, KOut, VOut> boolean skipRecord(
final Record<KIn, VIn> record, final Logger logger,
final Sensor droppedRecordsSensor,
final ProcessorContext<KOut, VOut> context) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
//
// we also ignore the record if value is null, because in a key-value data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (record.key() == null || record.value() == null) {
if (context.recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context.recordMetadata().get();
logger.warn(
"Skipping record due to null key or value. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
logger.warn(
"Skipping record due to null key or value. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return true;
} else {
return false;
}
}
}

53
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java

@ -39,6 +39,8 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -39,6 +39,8 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
private final Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder;
private final Joined<K, V1, V2> joined;
private final boolean enableSpuriousResultFix;
private final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
private boolean isSelfJoin;
private StreamStreamJoinNode(final String nodeName,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VR> valueJoiner,
@ -51,7 +53,8 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -51,7 +53,8 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
final Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder,
final Joined<K, V1, V2> joined,
final boolean enableSpuriousResultFix) {
final boolean enableSpuriousResultFix,
final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters) {
super(nodeName,
valueJoiner,
@ -68,6 +71,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -68,6 +71,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
this.outerJoinWindowStoreBuilder = outerJoinWindowStoreBuilder;
this.enableSpuriousResultFix = enableSpuriousResultFix;
this.selfJoinProcessorParameters = selfJoinProcessorParameters;
}
@ -92,17 +96,38 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -92,17 +96,38 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
final String thisWindowedStreamProcessorName = thisWindowedStreamProcessorParameters.processorName();
final String otherWindowedStreamProcessorName = otherWindowedStreamProcessorParameters.processorName();
topologyBuilder.addProcessor(thisProcessorName, thisProcessorParameters().processorSupplier(), thisWindowedStreamProcessorName);
topologyBuilder.addProcessor(otherProcessorName, otherProcessorParameters().processorSupplier(), otherWindowedStreamProcessorName);
topologyBuilder.addProcessor(mergeProcessorParameters().processorName(), mergeProcessorParameters().processorSupplier(), thisProcessorName, otherProcessorName);
topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
if (enableSpuriousResultFix) {
outerJoinWindowStoreBuilder.ifPresent(builder -> topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
if (isSelfJoin) {
topologyBuilder.addProcessor(selfJoinProcessorParameters.processorName(), selfJoinProcessorParameters.processorSupplier(), thisWindowedStreamProcessorName);
topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, selfJoinProcessorParameters.processorName());
} else {
topologyBuilder.addProcessor(thisProcessorName, thisProcessorParameters().processorSupplier(), thisWindowedStreamProcessorName);
topologyBuilder.addProcessor(otherProcessorName, otherProcessorParameters().processorSupplier(), otherWindowedStreamProcessorName);
topologyBuilder.addProcessor(mergeProcessorParameters().processorName(), mergeProcessorParameters().processorSupplier(), thisProcessorName, otherProcessorName);
topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
if (enableSpuriousResultFix) {
outerJoinWindowStoreBuilder.ifPresent(builder -> topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
}
}
}
public void setSelfJoin() {
this.isSelfJoin = true;
}
public boolean getSelfJoin() {
return isSelfJoin;
}
public ProcessorParameters<K, V1, ?, ?> getThisWindowedStreamProcessorParameters() {
return thisWindowedStreamProcessorParameters;
}
public ProcessorParameters<K, V2, ?, ?> getOtherWindowedStreamProcessorParameters() {
return otherWindowedStreamProcessorParameters;
}
public static <K, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
return new StreamStreamJoinNodeBuilder<>();
}
@ -121,6 +146,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -121,6 +146,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
private Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder;
private Joined<K, V1, V2> joined;
private boolean enableSpuriousResultFix = false;
private ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
private StreamStreamJoinNodeBuilder() {
}
@ -186,6 +212,12 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -186,6 +212,12 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
return this;
}
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withSelfJoinProcessorParameters(
final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters) {
this.selfJoinProcessorParameters = selfJoinProcessorParameters;
return this;
}
public StreamStreamJoinNode<K, V1, V2, VR> build() {
return new StreamStreamJoinNode<>(nodeName,
@ -199,7 +231,8 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @@ -199,7 +231,8 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
otherWindowStoreBuilder,
outerJoinWindowStoreBuilder,
joined,
enableSpuriousResultFix);
enableSpuriousResultFix,
selfJoinProcessorParameters);
}

38
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/WindowedStreamProcessorNode.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
public class WindowedStreamProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private final String windowStoreName;
/**
* Create a node representing a Stream Join Window processor.
*/
public WindowedStreamProcessorNode(final String windowStoreName,
final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(processorParameters.processorName(), processorParameters);
this.windowStoreName = windowStoreName;
}
@Override
public String toString() {
return "WindowedStreamProcessorNode{" +
"storeName=" + windowStoreName +
"} " + super.toString();
}
}

15
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

@ -1299,15 +1299,25 @@ public class StreamsConfigTest { @@ -1299,15 +1299,25 @@ public class StreamsConfigTest {
assertTrue(exception.getMessage().contains("Unrecognized config."));
}
@Test
public void shouldEnableSelfJoin() {
final String value = StreamsConfig.SINGLE_STORE_SELF_JOIN;
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final StreamsConfig config = new StreamsConfig(props);
assertEquals(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG), StreamsConfig.SINGLE_STORE_SELF_JOIN);
}
@Test
public void shouldAllowMultipleOptimizations() {
final String value = String.join(",",
StreamsConfig.SINGLE_STORE_SELF_JOIN,
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
StreamsConfig.MERGE_REPARTITION_TOPICS);
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final StreamsConfig config = new StreamsConfig(props);
final List<String> configs = Arrays.asList(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG).split(","));
assertEquals(2, configs.size());
assertEquals(3, configs.size());
assertTrue(configs.contains(StreamsConfig.SINGLE_STORE_SELF_JOIN));
assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
}
@ -1315,9 +1325,10 @@ public class StreamsConfigTest { @@ -1315,9 +1325,10 @@ public class StreamsConfigTest {
@Test
public void shouldEnableAllOptimizationsWithOptimizeConfig() {
final Set<String> configs = StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.OPTIMIZE);
assertEquals(2, configs.size());
assertEquals(3, configs.size());
assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
assertTrue(configs.contains(StreamsConfig.SINGLE_STORE_SELF_JOIN));
}
@Test

394
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java

@ -16,16 +16,23 @@ @@ -16,16 +16,23 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@ -43,12 +50,15 @@ import java.util.Map; @@ -43,12 +50,15 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.Topology.AutoOffsetReset;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -60,6 +70,7 @@ public class InternalStreamsBuilderTest { @@ -60,6 +70,7 @@ public class InternalStreamsBuilderTest {
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
private final String storePrefix = "prefix-";
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
private final Properties props = StreamsTestUtils.getStreamsConfig();
@Test
public void testNewName() {
@ -364,4 +375,387 @@ public class InternalStreamsBuilderTest { @@ -364,4 +375,387 @@ public class InternalStreamsBuilderTest {
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
@Test
public void shouldMarkStreamStreamJoinAsSelfJoinSingleStream() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream = builder.stream(Collections.singleton("t1"), consumed);
stream.join(stream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
final GraphNode parent = join.parentNodes().stream().findFirst().get();
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 1);
}
@Test
public void shouldMarkStreamStreamJoinAsSelfJoinTwoStreams() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 1);
}
@Test
public void shouldMarkStreamStreamJoinAsSelfJoinMergeTwoStreams() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t2"), consumed);
final KStream<String, String> stream3 = stream1.merge(stream2);
stream3.join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 1);
}
@Test
public void shouldMarkFirstStreamStreamJoinAsSelfJoin3WayJoin() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream3 = builder.stream(Collections.singleton("t3"), consumed);
stream1
.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)))
.join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final List<GraphNode> result = new ArrayList<>();
getNodesByType(builder.root, StreamStreamJoinNode.class, new HashSet<>(), result);
assertEquals(result.size(), 2);
assertTrue(((StreamStreamJoinNode) result.get(0)).getSelfJoin());
assertFalse(((StreamStreamJoinNode) result.get(1)).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 3);
}
@Test
public void shouldMarkAllStreamStreamJoinsAsSelfJoin() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream3 = builder.stream(Collections.singleton("t2"), consumed);
final KStream<String, String> stream4 = builder.stream(Collections.singleton("t2"), consumed);
final KStream<String, String> firstResult =
stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
final KStream<String, String> secondResult =
stream3.join(stream4, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
firstResult.merge(secondResult);
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final List<GraphNode> result = new ArrayList<>();
getNodesByType(builder.root, StreamStreamJoinNode.class, new HashSet<>(), result);
assertEquals(result.size(), 2);
assertTrue(((StreamStreamJoinNode) result.get(0)).getSelfJoin());
assertTrue(((StreamStreamJoinNode) result.get(1)).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 2);
}
@Test
public void shouldMarkFirstStreamStreamJoinAsSelfJoinNwaySameSource() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream3 = builder.stream(Collections.singleton("t1"), consumed);
stream1
.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)))
.join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final List<GraphNode> joinNodes = new ArrayList<>();
getNodesByType(builder.root, StreamStreamJoinNode.class, new HashSet<>(), joinNodes);
assertEquals(joinNodes.size(), 2);
assertTrue(((StreamStreamJoinNode) joinNodes.get(0)).getSelfJoin());
assertFalse(((StreamStreamJoinNode) joinNodes.get(1)).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 3);
}
@Test
public void shouldMarkFirstStreamStreamJoinAsSelfJoinNway() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream3 = builder.stream(Collections.singleton("t2"), consumed);
stream1
.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)))
.join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final List<GraphNode> joinNodes = new ArrayList<>();
getNodesByType(builder.root, StreamStreamJoinNode.class, new HashSet<>(), joinNodes);
assertEquals(joinNodes.size(), 2);
assertTrue(((StreamStreamJoinNode) joinNodes.get(0)).getSelfJoin());
assertFalse(((StreamStreamJoinNode) joinNodes.get(1)).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 3);
}
@Test
public void shouldMarkStreamStreamJoinAsSelfJoinTwoStreamsWithNoOpFilter() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
stream1.filter((key, value) -> value != null);
stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 1);
}
@Test
public void shouldMarkStreamStreamJoinAsSelfJoinTwoJoinsSameSource() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream3 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream4 = builder.stream(Collections.singleton("t1"), consumed);
stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
stream3.join(stream4, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final List<GraphNode> joinNodes = new ArrayList<>();
getNodesByType(builder.root, StreamStreamJoinNode.class, new HashSet<>(), joinNodes);
assertEquals(joinNodes.size(), 2);
assertTrue(((StreamStreamJoinNode) joinNodes.get(0)).getSelfJoin());
assertTrue(((StreamStreamJoinNode) joinNodes.get(1)).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 2);
}
/**
* The join node has two parents and the graph looks like:
* root ---> source (t1)
* source ---> filter , windowed-4, join
* filter ---> windowed-3, join
*/
@Test
public void shouldNotMarkStreamStreamJoinAsSelfJoinTwoStreamsWithFilter() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
stream1
.filter((key, value) -> value != null)
.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertFalse(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 2);
}
/**
* The join node has two parents and the graph looks like:
* root ---> source (t1)
* source ---> map , windowed-4, join
* map ---> windowed-3, join
*/
@Test
public void shouldNotMarkStreamStreamJoinAsSelfJoinOneStreamWithMap() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream3 = stream1.mapValues(v -> v);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t1"), consumed);
stream3.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertFalse(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 2);
}
@Test
public void shouldNotMarkStreamStreamJoinAsSelfJoinMultipleSources() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
final KStream<String, String> stream2 = builder.stream(Collections.singleton("t2"), consumed);
stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertFalse(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 2);
}
@Test
public void shouldOptimizeJoinWhenInConfig() {
// Given:
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.SINGLE_STORE_SELF_JOIN);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
stream1.join(stream1, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 1);
}
@Test
public void shouldNotOptimizeJoinWhenNotInConfig() {
// Given:
final String value = String.join(",",
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
StreamsConfig.MERGE_REPARTITION_TOPICS);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, value);
final KStream<String, String> stream1 = builder.stream(Collections.singleton("t1"), consumed);
stream1.join(stream1, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
// When:
builder.buildAndOptimizeTopology(props);
// Then:
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join);
assertFalse(((StreamStreamJoinNode) join).getSelfJoin());
final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 2);
}
private GraphNode getNodeByType(
final GraphNode currentNode,
final Class<? extends GraphNode> clazz,
final Set<GraphNode> visited) {
if (currentNode.getClass().isAssignableFrom(clazz)) {
return currentNode;
}
for (final GraphNode child: currentNode.children()) {
if (!visited.contains(child)) {
visited.add(child);
}
final GraphNode result = getNodeByType(child, clazz, visited);
if (result != null) {
return result;
}
}
return null;
}
private void getNodesByType(
final GraphNode currentNode,
final Class<? extends GraphNode> clazz,
final Set<GraphNode> visited,
final List<GraphNode> result) {
if (currentNode.getClass().isAssignableFrom(clazz)) {
result.add(currentNode);
}
for (final GraphNode child: currentNode.children()) {
if (!visited.contains(child)) {
visited.add(child);
getNodesByType(child, clazz, visited, result);
}
}
}
private void countJoinWindowNodes(
final AtomicInteger count,
final GraphNode currentNode,
final Set<GraphNode> visited) {
if (currentNode instanceof WindowedStreamProcessorNode) {
count.incrementAndGet();
}
for (final GraphNode child: currentNode.children()) {
if (!visited.contains(child)) {
visited.add(child);
countJoinWindowNodes(count, child, visited);
}
}
}
}

337
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoinTest.java

@ -0,0 +1,337 @@ @@ -0,0 +1,337 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
public class KStreamKStreamSelfJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Test
public void shouldMatchInnerJoinWithSelfJoinWithSingleStream() {
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
final List<KeyValueTimestamp<String, String>> expected;
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// Inner join topology
final MockApiProcessorSupplier<String, String, Void, Void> innerJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream2 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> innerJoin = stream2.join(
stream2,
valueJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
innerJoin.process(innerJoinSupplier);
final Topology innerJoinTopology = streamsBuilder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(innerJoinTopology)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
innerJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 1L);
inputTopic.pipeInput("B", "1", 2L);
inputTopic.pipeInput("A", "2", 3L);
inputTopic.pipeInput("B", "2", 4L);
inputTopic.pipeInput("B", "3", 5L);
expected = processor.processed();
}
// Self join topology
final MockApiProcessorSupplier<String, String, Void, Void> selfJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream1 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> selfJoin = stream1.join(
stream1,
valueJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
selfJoin.process(selfJoinSupplier);
final Topology selfJoinTopology = streamsBuilder.build(props);
try (final TopologyTestDriver driver = new TopologyTestDriver(selfJoinTopology, props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
selfJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 1L);
inputTopic.pipeInput("B", "1", 2L);
inputTopic.pipeInput("A", "2", 3L);
inputTopic.pipeInput("B", "2", 4L);
inputTopic.pipeInput("B", "3", 5L);
// Then:
processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
}
}
@Test
public void shouldMatchInnerJoinWithSelfJoinWithTwoStreams() {
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
final List<KeyValueTimestamp<String, String>> expected;
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// Inner join topology
final MockApiProcessorSupplier<String, String, Void, Void> innerJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream3 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream4 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> innerJoin = stream3.join(
stream4,
valueJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
innerJoin.process(innerJoinSupplier);
final Topology innerJoinTopology = streamsBuilder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(innerJoinTopology)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
innerJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 1L);
inputTopic.pipeInput("B", "1", 2L);
inputTopic.pipeInput("A", "2", 3L);
inputTopic.pipeInput("B", "2", 4L);
inputTopic.pipeInput("B", "3", 5L);
expected = processor.processed();
}
// Self join topology
final MockApiProcessorSupplier<String, String, Void, Void> selfJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream1 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> selfJoin = stream1.join(
stream2,
valueJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
selfJoin.process(selfJoinSupplier);
final Topology topology1 = streamsBuilder.build(props);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology1, props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
selfJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 1L);
inputTopic.pipeInput("B", "1", 2L);
inputTopic.pipeInput("A", "2", 3L);
inputTopic.pipeInput("B", "2", 4L);
inputTopic.pipeInput("B", "3", 5L);
// Then:
processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
}
}
@Test
public void shouldMatchInnerJoinWithSelfJoinDifferentBeforeAfterWindows() {
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
final List<KeyValueTimestamp<String, String>> expected;
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// Inner join topology
final MockApiProcessorSupplier<String, String, Void, Void> innerJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream3 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream4 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> innerJoin = stream3.join(
stream4,
valueJoiner,
JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(11), ofSeconds(10)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
innerJoin.process(innerJoinSupplier);
final Topology innerJoinTopology = streamsBuilder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(innerJoinTopology)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
innerJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 0L);
inputTopic.pipeInput("A", "2", 11000L);
inputTopic.pipeInput("B", "1", 12000L);
inputTopic.pipeInput("A", "3", 13000L);
inputTopic.pipeInput("A", "4", 15000L);
inputTopic.pipeInput("C", "1", 16000L);
inputTopic.pipeInput("D", "1", 17000L);
inputTopic.pipeInput("A", "5", 30000L);
expected = processor.processed();
}
// Self join topology
final MockApiProcessorSupplier<String, String, Void, Void> selfJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream1 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> selfJoin = stream1.join(
stream2,
valueJoiner,
JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(11), ofSeconds(10)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
selfJoin.process(selfJoinSupplier);
final Topology selfJoinTopology = streamsBuilder.build(props);
try (final TopologyTestDriver driver = new TopologyTestDriver(selfJoinTopology, props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
selfJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 0L);
inputTopic.pipeInput("A", "2", 11000L);
inputTopic.pipeInput("B", "1", 12000L);
inputTopic.pipeInput("A", "3", 13000L);
inputTopic.pipeInput("A", "4", 15000L);
inputTopic.pipeInput("C", "1", 16000L);
inputTopic.pipeInput("D", "1", 17000L);
inputTopic.pipeInput("A", "5", 30000L);
// Then:
processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
}
}
@Test
public void shouldMatchInnerJoinWithSelfJoinOutOfOrderMessages() {
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
final List<KeyValueTimestamp<String, String>> expected;
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// Inner join topology
final MockApiProcessorSupplier<String, String, Void, Void> innerJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> stream3 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream4 = streamsBuilder.stream(
topic2, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> innerJoin = stream3.join(
stream4,
valueJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(ofSeconds(10)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
innerJoin.process(innerJoinSupplier);
final Topology topology2 = streamsBuilder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(topology2)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
innerJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 0L);
inputTopic.pipeInput("A", "2", 9999);
inputTopic.pipeInput("B", "1", 11000L);
inputTopic.pipeInput("A", "3", 13000L);
inputTopic.pipeInput("A", "4", 15000L);
inputTopic.pipeInput("C", "1", 16000L);
inputTopic.pipeInput("D", "1", 17000L);
inputTopic.pipeInput("A", "5", 30000L);
inputTopic.pipeInput("A", "5", 6000);
expected = processor.processed();
}
// Self join topology
final KStream<String, String> stream1 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 = streamsBuilder.stream(
topic1, Consumed.with(Serdes.String(), Serdes.String()));
final MockApiProcessorSupplier<String, String, Void, Void> selfJoinSupplier =
new MockApiProcessorSupplier<>();
final KStream<String, String> selfJoin = stream1.join(
stream2,
valueJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(ofSeconds(10)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
selfJoin.process(selfJoinSupplier);
final Topology selfJoinTopology = streamsBuilder.build(props);
try (final TopologyTestDriver driver = new TopologyTestDriver(selfJoinTopology, props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
final MockApiProcessor<String, String, Void, Void> processor =
selfJoinSupplier.theCapturedProcessor();
inputTopic.pipeInput("A", "1", 0L);
inputTopic.pipeInput("A", "2", 9999);
inputTopic.pipeInput("B", "1", 11000L);
inputTopic.pipeInput("A", "3", 13000L);
inputTopic.pipeInput("A", "4", 15000L);
inputTopic.pipeInput("C", "1", 16000L);
inputTopic.pipeInput("D", "1", 17000L);
inputTopic.pipeInput("A", "5", 30000L);
inputTopic.pipeInput("A", "5", 6000);
// Then:
processor.checkAndClearProcessResult(expected.toArray(new KeyValueTimestamp[0]));
}
}
}
Loading…
Cancel
Save