Browse Source

KAFKA-4269: Update topic subscription when regex pattern specified out of topicGroups method

…d out of topicGroups method. The topicGroups method only called from StreamPartitionAssignor when KafkaStreams object  is the leader, needs to be executed for clients.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #2005 from bbejeck/KAFKA-4269_multiple_kstream_instances_mult_consumers_npe
pull/2039/merge
Bill Bejeck 8 years ago committed by Guozhang Wang
parent
commit
4c295a7844
  1. 27
      streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
  2. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
  3. 31
      streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java

27
streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java

@ -30,6 +30,8 @@ import org.apache.kafka.streams.processor.internals.SinkNode; @@ -30,6 +30,8 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@ -101,6 +103,8 @@ public class TopologyBuilder { @@ -101,6 +103,8 @@ public class TopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
private static class StateStoreFactory {
public final Set<String> users;
@ -831,14 +835,6 @@ public class TopologyBuilder { @@ -831,14 +835,6 @@ public class TopologyBuilder {
public synchronized Map<Integer, TopicsInfo> topicGroups() {
Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
if (subscriptionUpdates.hasUpdates()) {
for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from given regex
nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
}
}
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
@ -897,6 +893,17 @@ public class TopologyBuilder { @@ -897,6 +893,17 @@ public class TopologyBuilder {
return Collections.unmodifiableMap(topicGroups);
}
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from given regex
nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
}
}
}
private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) {
if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
@ -999,7 +1006,9 @@ public class TopologyBuilder { @@ -999,7 +1006,9 @@ public class TopologyBuilder {
return this.topicPattern;
}
public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId) {
log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates);
this.subscriptionUpdates = subscriptionUpdates;
setRegexMatchedTopicsToSourceNodes();
}
}

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

@ -178,10 +178,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @@ -178,10 +178,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
log.debug("have {} topics matching regex", topics);
log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(), topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
streamThread.builder.updateSubscriptions(subscriptionUpdates);
streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
}
return new Subscription(new ArrayList<>(topics), data.encode());
@ -669,6 +669,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @@ -669,6 +669,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
return !updatedTopicSubscriptions.isEmpty();
}
@Override
public String toString() {
return "SubscriptionUpdates{" +
"updatedTopicSubscriptions=" + updatedTopicSubscriptions +
'}';
}
}
}

31
streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java

@ -27,12 +27,14 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; @@ -27,12 +27,14 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -521,6 +523,7 @@ public class TopologyBuilderTest { @@ -521,6 +523,7 @@ public class TopologyBuilderTest {
assertEquals(1, properties.size());
}
@Test(expected = TopologyBuilderException.class)
public void shouldThroughOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
@ -583,4 +586,32 @@ public class TopologyBuilderTest { @@ -583,4 +586,32 @@ public class TopologyBuilderTest {
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source-1", "topic-foo");
builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
builder.addSource("source-3", Pattern.compile("topic-\\d"));
StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
updatedTopicsField.setAccessible(true);
Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
updatedTopics.add("topic-B");
updatedTopics.add("topic-3");
updatedTopics.add("topic-A");
builder.updateSubscriptions(subscriptionUpdates, null);
builder.setApplicationId("test-id");
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
}
}

Loading…
Cancel
Save