Browse Source

KAFKA-9298: reuse mapped stream error in joins (#8504)

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
pull/8733/head
Bill Bejeck 5 years ago committed by GitHub
parent
commit
cec6202369
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  2. 152
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
  3. 151
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java

13
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -129,6 +129,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -129,6 +129,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
private final boolean repartitionRequired;
private OptimizableRepartitionNode<K, V> repartitionNode;
KStreamImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
@ -983,6 +985,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -983,6 +985,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
// we still need to create the repartitioned source each time
// as it increments the counter which
// is needed to maintain topology compatibility
final String repartitionedSourceName = createRepartitionedSource(
builder,
repartitionKeySerde,
@ -991,8 +996,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -991,8 +996,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
null,
optimizableRepartitionNodeBuilder);
final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
if (repartitionNode == null || !name.equals(repartitionName)) {
repartitionNode = optimizableRepartitionNodeBuilder.build();
builder.addGraphNode(streamsGraphNode, repartitionNode);
}
return new KStreamImpl<>(
repartitionedSourceName,
@ -1000,7 +1007,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -1000,7 +1007,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
repartitionValueSerde,
Collections.singleton(repartitionedSourceName),
false,
optimizableRepartitionNode,
repartitionNode,
builder);
}

152
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java

@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsException;
@ -77,6 +79,38 @@ public class KStreamKStreamJoinTest { @@ -77,6 +79,38 @@ public class KStreamKStreamJoinTest {
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
}
@Test
public void shouldReuseRepartitionTopicWithGeneratedName() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");
assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString());
}
@Test
public void shouldCreateRepartitionTopicsWithUserProvidedName() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
final Topology topology = builder.build(props);
System.out.println(topology.describe().toString());
assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());
}
private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtInMetricsVersion) {
final StreamsBuilder builder = new StreamsBuilder();
@ -1556,4 +1590,122 @@ public class KStreamKStreamJoinTest { @@ -1556,4 +1590,122 @@ public class KStreamKStreamJoinTest {
}
private final String expectedTopologyWithUserNamedRepartitionTopics = "Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
" --> KSTREAM-MAP-0000000003\n" +
" Processor: KSTREAM-MAP-0000000003 (stores: [])\n" +
" --> second-join-left-repartition-filter, first-join-left-repartition-filter\n" +
" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: first-join-left-repartition-filter (stores: [])\n" +
" --> first-join-left-repartition-sink\n" +
" <-- KSTREAM-MAP-0000000003\n" +
" Processor: second-join-left-repartition-filter (stores: [])\n" +
" --> second-join-left-repartition-sink\n" +
" <-- KSTREAM-MAP-0000000003\n" +
" Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n" +
" <-- first-join-left-repartition-filter\n" +
" Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n" +
" <-- second-join-left-repartition-filter\n" +
"\n" +
" Sub-topology: 1\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" +
" --> first-join-other-windowed\n" +
" Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n" +
" --> first-join-this-windowed\n" +
" Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
" --> first-join-other-join\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
" --> first-join-this-join\n" +
" <-- first-join-left-repartition-source\n" +
" Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
" --> first-join-merge\n" +
" <-- first-join-other-windowed\n" +
" Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
" --> first-join-merge\n" +
" <-- first-join-this-windowed\n" +
" Processor: first-join-merge (stores: [])\n" +
" --> KSTREAM-SINK-0000000012\n" +
" <-- first-join-this-join, first-join-other-join\n" +
" Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" +
" <-- first-join-merge\n" +
"\n" +
" Sub-topology: 2\n" +
" Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" +
" --> second-join-other-windowed\n" +
" Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n" +
" --> second-join-this-windowed\n" +
" Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
" --> second-join-other-join\n" +
" <-- KSTREAM-SOURCE-0000000002\n" +
" Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
" --> second-join-this-join\n" +
" <-- second-join-left-repartition-source\n" +
" Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
" --> second-join-merge\n" +
" <-- second-join-other-windowed\n" +
" Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
" --> second-join-merge\n" +
" <-- second-join-this-windowed\n" +
" Processor: second-join-merge (stores: [])\n" +
" --> KSTREAM-SINK-0000000021\n" +
" <-- second-join-this-join, second-join-other-join\n" +
" Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" +
" <-- second-join-merge\n\n";
private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
" --> KSTREAM-MAP-0000000003\n" +
" Processor: KSTREAM-MAP-0000000003 (stores: [])\n" +
" --> KSTREAM-FILTER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" +
" --> KSTREAM-SINK-0000000004\n" +
" <-- KSTREAM-MAP-0000000003\n" +
" Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n" +
" <-- KSTREAM-FILTER-0000000005\n" +
"\n" +
" Sub-topology: 1\n" +
" Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n" +
" --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" +
" --> KSTREAM-WINDOWED-0000000008\n" +
" Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" +
" --> KSTREAM-WINDOWED-0000000017\n" +
" Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
" --> KSTREAM-JOINTHIS-0000000009\n" +
" <-- KSTREAM-SOURCE-0000000006\n" +
" Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
" --> KSTREAM-JOINOTHER-0000000010\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
" --> KSTREAM-JOINTHIS-0000000018\n" +
" <-- KSTREAM-SOURCE-0000000006\n" +
" Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
" --> KSTREAM-JOINOTHER-0000000019\n" +
" <-- KSTREAM-SOURCE-0000000002\n" +
" Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
" --> KSTREAM-MERGE-0000000011\n" +
" <-- KSTREAM-WINDOWED-0000000008\n" +
" Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
" --> KSTREAM-MERGE-0000000020\n" +
" <-- KSTREAM-WINDOWED-0000000017\n" +
" Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
" --> KSTREAM-MERGE-0000000011\n" +
" <-- KSTREAM-WINDOWED-0000000007\n" +
" Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
" --> KSTREAM-MERGE-0000000020\n" +
" <-- KSTREAM-WINDOWED-0000000016\n" +
" Processor: KSTREAM-MERGE-0000000011 (stores: [])\n" +
" --> KSTREAM-SINK-0000000012\n" +
" <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n" +
" Processor: KSTREAM-MERGE-0000000020 (stores: [])\n" +
" --> KSTREAM-SINK-0000000021\n" +
" <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n" +
" Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" +
" <-- KSTREAM-MERGE-0000000011\n" +
" Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n" +
" <-- KSTREAM-MERGE-0000000020\n\n";
}

151
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java

@ -16,19 +16,35 @@ @@ -16,19 +16,35 @@
*/
package org.apache.kafka.streams.kstream.internals;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -37,20 +53,6 @@ import org.junit.After; @@ -37,20 +53,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKTableJoinTest {
private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
@ -112,6 +114,38 @@ public class KStreamKTableJoinTest { @@ -112,6 +114,38 @@ public class KStreamKTableJoinTest {
}
}
@Test
public void shouldReuseRepartitionTopicWithGeneratedName() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
rekeyedStream.join(tableB, (value1, value2) -> value1 + value2).to("out-one");
rekeyedStream.join(tableC, (value1, value2) -> value1 + value2).to("out-two");
final Topology topology = builder.build(props);
assertEquals(expectedTopologyWithGeneratedRepartitionTopicNames, topology.describe().toString());
}
@Test
public void shouldCreateRepartitionTopicsWithUserProvidedName() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join")).to("out-one");
rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join")).to("out-two");
final Topology topology = builder.build(props);
System.out.println(topology.describe().toString());
assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
}
@Test
public void shouldRequireCopartitionedStreams() {
final Collection<Set<String>> copartitionGroups =
@ -260,4 +294,91 @@ public class KStreamKTableJoinTest { @@ -260,4 +294,91 @@ public class KStreamKTableJoinTest {
);
}
}
private final String expectedTopologyWithGeneratedRepartitionTopicNames =
"Topologies:\n"
+ " Sub-topology: 0\n"
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ " --> KSTREAM-MAP-0000000007\n"
+ " Processor: KSTREAM-MAP-0000000007 (stores: [])\n"
+ " --> KSTREAM-FILTER-0000000009\n"
+ " <-- KSTREAM-SOURCE-0000000000\n"
+ " Processor: KSTREAM-FILTER-0000000009 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000008\n"
+ " <-- KSTREAM-MAP-0000000007\n"
+ " Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-MAP-0000000007-repartition)\n"
+ " <-- KSTREAM-FILTER-0000000009\n"
+ "\n"
+ " Sub-topology: 1\n"
+ " Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-MAP-0000000007-repartition])\n"
+ " --> KSTREAM-JOIN-0000000011, KSTREAM-JOIN-0000000016\n"
+ " Processor: KSTREAM-JOIN-0000000011 (stores: [topic2-STATE-STORE-0000000001])\n"
+ " --> KSTREAM-SINK-0000000012\n"
+ " <-- KSTREAM-SOURCE-0000000010\n"
+ " Processor: KSTREAM-JOIN-0000000016 (stores: [topic3-STATE-STORE-0000000004])\n"
+ " --> KSTREAM-SINK-0000000017\n"
+ " <-- KSTREAM-SOURCE-0000000010\n"
+ " Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n"
+ " --> KTABLE-SOURCE-0000000003\n"
+ " Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n"
+ " --> KTABLE-SOURCE-0000000006\n"
+ " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n"
+ " <-- KSTREAM-JOIN-0000000011\n"
+ " Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n"
+ " <-- KSTREAM-JOIN-0000000016\n"
+ " Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n"
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000002\n"
+ " Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n"
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000005\n\n";
private final String expectedTopologyWithUserProvidedRepartitionTopicNames =
"Topologies:\n"
+ " Sub-topology: 0\n"
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ " --> KSTREAM-MAP-0000000007\n"
+ " Processor: KSTREAM-MAP-0000000007 (stores: [])\n"
+ " --> first-join-repartition-filter, second-join-repartition-filter\n"
+ " <-- KSTREAM-SOURCE-0000000000\n"
+ " Processor: first-join-repartition-filter (stores: [])\n"
+ " --> first-join-repartition-sink\n"
+ " <-- KSTREAM-MAP-0000000007\n"
+ " Processor: second-join-repartition-filter (stores: [])\n"
+ " --> second-join-repartition-sink\n"
+ " <-- KSTREAM-MAP-0000000007\n"
+ " Sink: first-join-repartition-sink (topic: first-join-repartition)\n"
+ " <-- first-join-repartition-filter\n"
+ " Sink: second-join-repartition-sink (topic: second-join-repartition)\n"
+ " <-- second-join-repartition-filter\n"
+ "\n"
+ " Sub-topology: 1\n"
+ " Source: first-join-repartition-source (topics: [first-join-repartition])\n"
+ " --> first-join\n"
+ " Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n"
+ " --> KTABLE-SOURCE-0000000003\n"
+ " Processor: first-join (stores: [topic2-STATE-STORE-0000000001])\n"
+ " --> KSTREAM-SINK-0000000012\n"
+ " <-- first-join-repartition-source\n"
+ " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n"
+ " <-- first-join\n"
+ " Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n"
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000002\n"
+ "\n"
+ " Sub-topology: 2\n"
+ " Source: second-join-repartition-source (topics: [second-join-repartition])\n"
+ " --> second-join\n"
+ " Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n"
+ " --> KTABLE-SOURCE-0000000006\n"
+ " Processor: second-join (stores: [topic3-STATE-STORE-0000000004])\n"
+ " --> KSTREAM-SINK-0000000017\n"
+ " <-- second-join-repartition-source\n"
+ " Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n"
+ " <-- second-join\n"
+ " Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n"
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000005\n\n";
}

Loading…
Cancel
Save