From 4c295a78446be6eba24ca4a9b7e506657e55c875 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 19 Oct 2016 21:04:28 -0700 Subject: [PATCH] KAFKA-4269: Update topic subscription when regex pattern specified out of topicGroups method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …d out of topicGroups method. The topicGroups method only called from StreamPartitionAssignor when KafkaStreams object is the leader, needs to be executed for clients. Author: bbejeck Reviewers: Damian Guy , Guozhang Wang Closes #2005 from bbejeck/KAFKA-4269_multiple_kstream_instances_mult_consumers_npe --- .../streams/processor/TopologyBuilder.java | 27 ++++++++++------ .../internals/StreamPartitionAssignor.java | 10 ++++-- .../processor/TopologyBuilderTest.java | 31 +++++++++++++++++++ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index f5fd5716c81..81f1f63078f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -30,6 +30,8 @@ import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -101,6 +103,8 @@ public class TopologyBuilder { private Map> nodeGroups = null; + private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class); + private static class StateStoreFactory { public final Set users; @@ -831,14 +835,6 @@ public class TopologyBuilder { public synchronized Map topicGroups() { Map topicGroups = new LinkedHashMap<>(); - if (subscriptionUpdates.hasUpdates()) { - for (Map.Entry stringPatternEntry : nodeToSourcePatterns.entrySet()) { - SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); - //need to update nodeToSourceTopics with topics matched from given regex - nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); - } - } - if (nodeGroups == null) nodeGroups = makeNodeGroups(); @@ -897,6 +893,17 @@ public class TopologyBuilder { return Collections.unmodifiableMap(topicGroups); } + private void setRegexMatchedTopicsToSourceNodes() { + if (subscriptionUpdates.hasUpdates()) { + for (Map.Entry stringPatternEntry : nodeToSourcePatterns.entrySet()) { + SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); + //need to update nodeToSourceTopics with topics matched from given regex + nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); + log.debug("nodeToSourceTopics {}", nodeToSourceTopics); + } + } + } + private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) { if (!(supplier instanceof RocksDBWindowStoreSupplier)) { return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig()); @@ -999,7 +1006,9 @@ public class TopologyBuilder { return this.topicPattern; } - public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) { + public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId) { + log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates); this.subscriptionUpdates = subscriptionUpdates; + setRegexMatchedTopicsToSourceNodes(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 3be9c114ba7..dcba5437bf5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -178,10 +178,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (streamThread.builder.sourceTopicPattern() != null) { SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); - log.debug("have {} topics matching regex", topics); + log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(), topics); // update the topic groups with the returned subscription set for regex pattern subscriptions subscriptionUpdates.updateTopics(topics); - streamThread.builder.updateSubscriptions(subscriptionUpdates); + streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName()); } return new Subscription(new ArrayList<>(topics), data.encode()); @@ -669,6 +669,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable return !updatedTopicSubscriptions.isEmpty(); } + @Override + public String toString() { + return "SubscriptionUpdates{" + + "updatedTopicSubscriptions=" + updatedTopicSubscriptions + + '}'; + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 3f45967aeb1..d2609377933 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -27,12 +27,14 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.ProcessorTopologyTestDriver; import org.junit.Test; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -521,6 +523,7 @@ public class TopologyBuilderTest { assertEquals(1, properties.size()); } + @Test(expected = TopologyBuilderException.class) public void shouldThroughOnUnassignedStateStoreAccess() { final String sourceNodeName = "source"; @@ -583,4 +586,32 @@ public class TopologyBuilderTest { } } + @SuppressWarnings("unchecked") + @Test + public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source-1", "topic-foo"); + builder.addSource("source-2", Pattern.compile("topic-[A-C]")); + builder.addSource("source-3", Pattern.compile("topic-\\d")); + + StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + updatedTopicsField.setAccessible(true); + + Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); + + updatedTopics.add("topic-B"); + updatedTopics.add("topic-3"); + updatedTopics.add("topic-A"); + + builder.updateSubscriptions(subscriptionUpdates, null); + builder.setApplicationId("test-id"); + + Map topicGroups = builder.topicGroups(); + assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo")); + assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A")); + assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B")); + assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3")); + + } }