diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 4936a1f6a08..2282287f56e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -129,6 +129,8 @@ public class KStreamImpl extends AbstractStream implements KStream repartitionNode; + KStreamImpl(final String name, final Serde keySerde, final Serde valueSerde, @@ -983,6 +985,9 @@ public class KStreamImpl extends AbstractStream implements KStream repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde; final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); + // we still need to create the repartitioned source each time + // as it increments the counter which + // is needed to maintain topology compatibility final String repartitionedSourceName = createRepartitionedSource( builder, repartitionKeySerde, @@ -991,8 +996,10 @@ public class KStreamImpl extends AbstractStream implements KStream optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); - builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode); + if (repartitionNode == null || !name.equals(repartitionName)) { + repartitionNode = optimizableRepartitionNodeBuilder.build(); + builder.addGraphNode(streamsGraphNode, repartitionNode); + } return new KStreamImpl<>( repartitionedSourceName, @@ -1000,7 +1007,7 @@ public class KStreamImpl extends AbstractStream implements KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); + final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); + final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); + newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); + newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); + assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString()); + } + + @Test + public void shouldCreateRepartitionTopicsWithUserProvidedName() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); + final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); + final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); + final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); + newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); + newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); + final Topology topology = builder.build(props); + System.out.println(topology.describe().toString()); + assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString()); + } + private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtInMetricsVersion) { final StreamsBuilder builder = new StreamsBuilder(); @@ -1556,4 +1590,122 @@ public class KStreamKStreamJoinTest { } + private final String expectedTopologyWithUserNamedRepartitionTopics = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + + " --> KSTREAM-MAP-0000000003\n" + + " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" + + " --> second-join-left-repartition-filter, first-join-left-repartition-filter\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: first-join-left-repartition-filter (stores: [])\n" + + " --> first-join-left-repartition-sink\n" + + " <-- KSTREAM-MAP-0000000003\n" + + " Processor: second-join-left-repartition-filter (stores: [])\n" + + " --> second-join-left-repartition-sink\n" + + " <-- KSTREAM-MAP-0000000003\n" + + " Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n" + + " <-- first-join-left-repartition-filter\n" + + " Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n" + + " <-- second-join-left-repartition-filter\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" + + " --> first-join-other-windowed\n" + + " Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n" + + " --> first-join-this-windowed\n" + + " Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" + + " --> first-join-other-join\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" + + " --> first-join-this-join\n" + + " <-- first-join-left-repartition-source\n" + + " Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" + + " --> first-join-merge\n" + + " <-- first-join-other-windowed\n" + + " Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" + + " --> first-join-merge\n" + + " <-- first-join-this-windowed\n" + + " Processor: first-join-merge (stores: [])\n" + + " --> KSTREAM-SINK-0000000012\n" + + " <-- first-join-this-join, first-join-other-join\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" + + " <-- first-join-merge\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" + + " --> second-join-other-windowed\n" + + " Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n" + + " --> second-join-this-windowed\n" + + " Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" + + " --> second-join-other-join\n" + + " <-- KSTREAM-SOURCE-0000000002\n" + + " Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" + + " --> second-join-this-join\n" + + " <-- second-join-left-repartition-source\n" + + " Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" + + " --> second-join-merge\n" + + " <-- second-join-other-windowed\n" + + " Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" + + " --> second-join-merge\n" + + " <-- second-join-this-windowed\n" + + " Processor: second-join-merge (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- second-join-this-join, second-join-other-join\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" + + " <-- second-join-merge\n\n"; + + private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + + " --> KSTREAM-MAP-0000000003\n" + + " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000005\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" + + " --> KSTREAM-SINK-0000000004\n" + + " <-- KSTREAM-MAP-0000000003\n" + + " Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n" + + " <-- KSTREAM-FILTER-0000000005\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n" + + " --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" + + " --> KSTREAM-WINDOWED-0000000008\n" + + " Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" + + " --> KSTREAM-WINDOWED-0000000017\n" + + " Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" + + " --> KSTREAM-JOINTHIS-0000000009\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + + " Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" + + " --> KSTREAM-JOINOTHER-0000000010\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" + + " --> KSTREAM-JOINTHIS-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + + " Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" + + " --> KSTREAM-JOINOTHER-0000000019\n" + + " <-- KSTREAM-SOURCE-0000000002\n" + + " Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " <-- KSTREAM-WINDOWED-0000000008\n" + + " Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" + + " --> KSTREAM-MERGE-0000000020\n" + + " <-- KSTREAM-WINDOWED-0000000017\n" + + " Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " <-- KSTREAM-WINDOWED-0000000007\n" + + " Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" + + " --> KSTREAM-MERGE-0000000020\n" + + " <-- KSTREAM-WINDOWED-0000000016\n" + + " Processor: KSTREAM-MERGE-0000000011 (stores: [])\n" + + " --> KSTREAM-SINK-0000000012\n" + + " <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n" + + " Processor: KSTREAM-MERGE-0000000020 (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" + + " <-- KSTREAM-MERGE-0000000011\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n" + + " <-- KSTREAM-MERGE-0000000020\n\n"; } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index 1d100fefc6e..a5368cde9c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -16,19 +16,35 @@ */ package org.apache.kafka.streams.kstream.internals; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Random; +import java.util.Set; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -37,20 +53,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.time.Duration; -import java.time.Instant; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Properties; -import java.util.Random; -import java.util.Set; - -import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; - public class KStreamKTableJoinTest { private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0]; @@ -112,6 +114,38 @@ public class KStreamKTableJoinTest { } } + @Test + public void shouldReuseRepartitionTopicWithGeneratedName() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + final KTable tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String())); + final KStream rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k)); + rekeyedStream.join(tableB, (value1, value2) -> value1 + value2).to("out-one"); + rekeyedStream.join(tableC, (value1, value2) -> value1 + value2).to("out-two"); + final Topology topology = builder.build(props); + assertEquals(expectedTopologyWithGeneratedRepartitionTopicNames, topology.describe().toString()); + } + + @Test + public void shouldCreateRepartitionTopicsWithUserProvidedName() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + final KTable tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String())); + final KStream rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k)); + + rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join")).to("out-one"); + rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join")).to("out-two"); + final Topology topology = builder.build(props); + System.out.println(topology.describe().toString()); + assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString()); + } + @Test public void shouldRequireCopartitionedStreams() { final Collection> copartitionGroups = @@ -260,4 +294,91 @@ public class KStreamKTableJoinTest { ); } } + + + private final String expectedTopologyWithGeneratedRepartitionTopicNames = + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + + " --> KSTREAM-MAP-0000000007\n" + + " Processor: KSTREAM-MAP-0000000007 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000009\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000009 (stores: [])\n" + + " --> KSTREAM-SINK-0000000008\n" + + " <-- KSTREAM-MAP-0000000007\n" + + " Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-MAP-0000000007-repartition)\n" + + " <-- KSTREAM-FILTER-0000000009\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-MAP-0000000007-repartition])\n" + + " --> KSTREAM-JOIN-0000000011, KSTREAM-JOIN-0000000016\n" + + " Processor: KSTREAM-JOIN-0000000011 (stores: [topic2-STATE-STORE-0000000001])\n" + + " --> KSTREAM-SINK-0000000012\n" + + " <-- KSTREAM-SOURCE-0000000010\n" + + " Processor: KSTREAM-JOIN-0000000016 (stores: [topic3-STATE-STORE-0000000004])\n" + + " --> KSTREAM-SINK-0000000017\n" + + " <-- KSTREAM-SOURCE-0000000010\n" + + " Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n" + + " --> KTABLE-SOURCE-0000000003\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n" + + " --> KTABLE-SOURCE-0000000006\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" + + " <-- KSTREAM-JOIN-0000000011\n" + + " Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n" + + " <-- KSTREAM-JOIN-0000000016\n" + + " Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000002\n" + + " Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000005\n\n"; + + + private final String expectedTopologyWithUserProvidedRepartitionTopicNames = + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + + " --> KSTREAM-MAP-0000000007\n" + + " Processor: KSTREAM-MAP-0000000007 (stores: [])\n" + + " --> first-join-repartition-filter, second-join-repartition-filter\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: first-join-repartition-filter (stores: [])\n" + + " --> first-join-repartition-sink\n" + + " <-- KSTREAM-MAP-0000000007\n" + + " Processor: second-join-repartition-filter (stores: [])\n" + + " --> second-join-repartition-sink\n" + + " <-- KSTREAM-MAP-0000000007\n" + + " Sink: first-join-repartition-sink (topic: first-join-repartition)\n" + + " <-- first-join-repartition-filter\n" + + " Sink: second-join-repartition-sink (topic: second-join-repartition)\n" + + " <-- second-join-repartition-filter\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: first-join-repartition-source (topics: [first-join-repartition])\n" + + " --> first-join\n" + + " Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n" + + " --> KTABLE-SOURCE-0000000003\n" + + " Processor: first-join (stores: [topic2-STATE-STORE-0000000001])\n" + + " --> KSTREAM-SINK-0000000012\n" + + " <-- first-join-repartition-source\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" + + " <-- first-join\n" + + " Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000002\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: second-join-repartition-source (topics: [second-join-repartition])\n" + + " --> second-join\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n" + + " --> KTABLE-SOURCE-0000000006\n" + + " Processor: second-join (stores: [topic3-STATE-STORE-0000000004])\n" + + " --> KSTREAM-SINK-0000000017\n" + + " <-- second-join-repartition-source\n" + + " Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n" + + " <-- second-join\n" + + " Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000005\n\n"; }