From 2c305dc64c033ffcfd6e45548752f0702f373032 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 28 Nov 2018 21:15:26 -0500 Subject: [PATCH] KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959) This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results. I've added a test which fails without the fix. Reviewers: John Roesler , Matthias J. Sax , Guozhang Wang --- .../kstream/internals/KStreamImpl.java | 2 +- .../kstream/internals/KStreamImplTest.java | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 3b691513004..ed5625e3a17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -753,7 +753,7 @@ public class KStreamImpl extends AbstractStream implements KStream(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder); + return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, streamTableJoinNode, builder); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 772836f9961..e0c38c656c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -61,15 +61,18 @@ import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; import java.util.regex.Pattern; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @SuppressWarnings("unchecked") @@ -437,6 +440,27 @@ public class KStreamImplTest { } } } + + @Test + public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + final GlobalKTable globalKTable = builder.globalTable("globalTopic"); + final KeyValueMapper kvMappper = (k, v) -> k + v; + final ValueJoiner valueJoiner = (v1, v2) -> v1 + v2; + builder.stream("topic").selectKey((k, v) -> v) + .join(globalKTable, kvMappper, valueJoiner) + .groupByKey() + .count(); + + final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + final String topology = builder.build().describe().toString(); + final Matcher matcher = repartitionTopicPattern.matcher(topology); + assertTrue(matcher.find()); + final String match = matcher.group(); + assertThat(match, notNullValue()); + assertTrue(match.endsWith("repartition")); + + } @Test public void testToWithNullValueSerdeDoesntNPE() {