Browse Source

KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/5958/merge
Bill Bejeck 6 years ago committed by Guozhang Wang
parent
commit
2c305dc64c
  1. 2
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  2. 24
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java

2
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -753,7 +753,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -753,7 +753,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
// do not have serde for joined result
return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder);
return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, streamTableJoinNode, builder);
}
@SuppressWarnings("unchecked")

24
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java

@ -61,15 +61,18 @@ import java.util.Collections; @@ -61,15 +61,18 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("unchecked")
@ -437,6 +440,27 @@ public class KStreamImplTest { @@ -437,6 +440,27 @@ public class KStreamImplTest {
}
}
}
@Test
public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
final StreamsBuilder builder = new StreamsBuilder();
final GlobalKTable<String, String> globalKTable = builder.globalTable("globalTopic");
final KeyValueMapper<String, String, String> kvMappper = (k, v) -> k + v;
final ValueJoiner<String, String, String> valueJoiner = (v1, v2) -> v1 + v2;
builder.<String, String>stream("topic").selectKey((k, v) -> v)
.join(globalKTable, kvMappper, valueJoiner)
.groupByKey()
.count();
final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
final String topology = builder.build().describe().toString();
final Matcher matcher = repartitionTopicPattern.matcher(topology);
assertTrue(matcher.find());
final String match = matcher.group();
assertThat(match, notNullValue());
assertTrue(match.endsWith("repartition"));
}
@Test
public void testToWithNullValueSerdeDoesntNPE() {

Loading…
Cancel
Save