Browse Source

KAFKA-8298: Fix possible concurrent modification exception (#6643)

When processing multiple key-changing operations during the optimization phase a ConcurrentModificationException is possible.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/6660/head
Bill Bejeck 6 years ago committed by GitHub
parent
commit
ba1fc21864
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
  2. 33
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java

8
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

@ -42,9 +42,11 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Properties; import java.util.Properties;
@ -311,8 +313,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void maybeOptimizeRepartitionOperations() { private void maybeOptimizeRepartitionOperations() {
maybeUpdateKeyChangingRepartitionNodeMap(); maybeUpdateKeyChangingRepartitionNodeMap();
final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator = keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) { while (entryIterator.hasNext()) {
final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
final StreamsGraphNode keyChangingNode = entry.getKey(); final StreamsGraphNode keyChangingNode = entry.getKey();
@ -368,7 +372,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
} }
keyChangingNode.addChild(optimizedSingleRepartition); keyChangingNode.addChild(optimizedSingleRepartition);
keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey()); entryIterator.remove();
} }
} }

33
streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java

@ -17,15 +17,21 @@
package org.apache.kafka.streams.kstream.internals.graph; package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
import org.junit.Test; import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -67,6 +73,33 @@ public class StreamsGraphTest {
} }
@Test
public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> inputStream = builder.stream("inputTopic");
final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0, 5));
// first repartition
changedKeyStream.groupByKey(Grouped.as("count-repartition"))
.count(Materialized.as("count-store"))
.toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
// second repartition
changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("windowed-count-store"))
.toStream()
.map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", Produced.with(Serdes.String(), Serdes.Long()));
builder.build(properties);
}
@Test @Test
public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() { public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {

Loading…
Cancel
Save