diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 960b0302dcf..e7a7678aa02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -42,9 +42,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.PriorityQueue; import java.util.Properties; @@ -311,8 +313,10 @@ public class InternalStreamsBuilder implements InternalNameProvider { @SuppressWarnings("unchecked") private void maybeOptimizeRepartitionOperations() { maybeUpdateKeyChangingRepartitionNodeMap(); + final Iterator>> entryIterator = keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator(); - for (final Map.Entry> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) { + while (entryIterator.hasNext()) { + final Map.Entry> entry = entryIterator.next(); final StreamsGraphNode keyChangingNode = entry.getKey(); @@ -368,7 +372,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { } keyChangingNode.addChild(optimizedSingleRepartition); - keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey()); + entryIterator.remove(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index bd43685a3c5..0fecaa2963e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -17,15 +17,21 @@ package org.apache.kafka.streams.kstream.internals.graph; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -67,6 +73,33 @@ public class StreamsGraphTest { } + @Test + public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() { + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream inputStream = builder.stream("inputTopic"); + + final KStream changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0, 5)); + + // first repartition + changedKeyStream.groupByKey(Grouped.as("count-repartition")) + .count(Materialized.as("count-store")) + .toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long())); + + // second repartition + changedKeyStream.groupByKey(Grouped.as("windowed-repartition")) + .windowedBy(TimeWindows.of(Duration.ofSeconds(5))) + .count(Materialized.as("windowed-count-store")) + .toStream() + .map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", Produced.with(Serdes.String(), Serdes.Long())); + + builder.build(properties); + } + @Test public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {