diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index f9a0397127e..c3614bb0aed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -1532,6 +1532,10 @@ public class TopologyBuilder { return applicationId + "-" + topic; } + public SubscriptionUpdates subscriptionUpdates() { + return subscriptionUpdates; + } + public synchronized Pattern sourceTopicPattern() { if (this.topicPattern == null) { final List allSourceTopics = new ArrayList<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index abcabf05fa4..9d5d4ccacb9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -253,17 +253,23 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable standbyTasks.removeAll(previousActiveTasks); SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint); - if (streamThread.builder.sourceTopicPattern() != null) { - SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); - log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(), topics); - // update the topic groups with the returned subscription set for regex pattern subscriptions - subscriptionUpdates.updateTopics(topics); - streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName()); + if (streamThread.builder.sourceTopicPattern() != null && + !streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) { + updateSubscribedTopics(topics); } return new Subscription(new ArrayList<>(topics), data.encode()); } + private void updateSubscribedTopics(Set topics) { + SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + log.debug("stream-thread [{}] found {} topics possibly matching regex", + streamThread.getName(), topics); + // update the topic groups with the returned subscription set for regex pattern subscriptions + subscriptionUpdates.updateTopics(topics); + streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName()); + } + /* * This assigns tasks to consumer clients in the following steps. * @@ -606,6 +612,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo); + + checkForNewTopicAssignments(assignment); + } + + private void checkForNewTopicAssignments(Assignment assignment) { + if (streamThread.builder.sourceTopicPattern() != null) { + final Set assignedTopics = new HashSet<>(); + for (final TopicPartition topicPartition : assignment.partitions()) { + assignedTopics.add(topicPartition.topic()); + } + if (!streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics)) { + assignedTopics.addAll(streamThread.builder.subscriptionUpdates().getUpdates()); + updateSubscribedTopics(assignedTopics); + } + } } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1e73c899a90..a453e497f9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1456,5 +1456,4 @@ public class StreamThread extends Thread { return firstException; } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 0b60b00a331..c0ce9e7576d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; @@ -681,7 +681,7 @@ public class TopologyBuilderTest { builder.addSource("source-2", Pattern.compile("topic-[A-C]")); builder.addSource("source-3", Pattern.compile("topic-\\d")); - StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); updatedTopicsField.setAccessible(true); @@ -766,7 +766,7 @@ public class TopologyBuilderTest { .addProcessor("my-processor", new MockProcessorSupplier(), "ingest") .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor"); - final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); updatedTopicsField.setAccessible(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 9937ad4d557..17eb50a9021 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -44,6 +45,7 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -104,6 +106,12 @@ public class StreamPartitionAssignorTest { private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); private final TopologyBuilder builder = new TopologyBuilder(); private final StreamsConfig config = new StreamsConfig(configProps()); + private final StreamThread mockStreamThread = new StreamThread(builder, config, + mockClientSupplier, "appID", + "clientId", UUID.randomUUID(), + new Metrics(), new MockTime(), + null, 1L); + private final Map configurationMap = new HashMap<>(); private Properties configProps() { return new Properties() { @@ -116,6 +124,13 @@ public class StreamPartitionAssignorTest { }; } + @Before + public void setUp() { + configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, mockStreamThread); + configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0); + partitionAssignor.configure(configurationMap); + } + @SuppressWarnings("unchecked") @Test public void testSubscription() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e255350bcb8..8205c27f6db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -36,6 +36,9 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; @@ -52,6 +55,7 @@ import java.io.File; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -1499,6 +1503,93 @@ public class StreamThreadTest { assertFalse(testStreamTask.committed); } + + @Test + @SuppressWarnings("unchecked") + public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception { + final TopologyBuilder topologyBuilder = new TopologyBuilder(); + topologyBuilder.addSource("source", Pattern.compile("t.*")); + topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source"); + + final StreamThread thread = new StreamThread( + topologyBuilder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + final Map configurationMap = new HashMap<>(); + + configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread); + configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0); + partitionAssignor.configure(configurationMap); + + thread.setPartitionAssignor(partitionAssignor); + + final Field + nodeToSourceTopicsField = + topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics"); + nodeToSourceTopicsField.setAccessible(true); + final Map> + nodeToSourceTopics = + (Map>) nodeToSourceTopicsField.get(topologyBuilder); + final List topicPartitions = new ArrayList<>(); + + final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0); + final TopicPartition topicPartition2 = new TopicPartition("topic-2", 0); + final TopicPartition topicPartition3 = new TopicPartition("topic-3", 0); + + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 0); + final TaskId taskId3 = new TaskId(0, 0); + + List activeTasks = Arrays.asList(taskId1); + + final Map> standbyTasks = new HashMap<>(); + + AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap>()); + + topicPartitions.addAll(Arrays.asList(topicPartition1)); + PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); + partitionAssignor.onAssignment(assignment); + + assertTrue(nodeToSourceTopics.get("source").size() == 1); + assertTrue(nodeToSourceTopics.get("source").contains("topic-1")); + + topicPartitions.clear(); + + activeTasks = Arrays.asList(taskId1, taskId2); + info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap>()); + topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2)); + assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); + partitionAssignor.onAssignment(assignment); + + assertTrue(nodeToSourceTopics.get("source").size() == 2); + assertTrue(nodeToSourceTopics.get("source").contains("topic-1")); + assertTrue(nodeToSourceTopics.get("source").contains("topic-2")); + + topicPartitions.clear(); + + activeTasks = Arrays.asList(taskId1, taskId2, taskId3); + info = new AssignmentInfo(activeTasks, standbyTasks, + new HashMap>()); + topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, topicPartition3)); + assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); + partitionAssignor.onAssignment(assignment); + + assertTrue(nodeToSourceTopics.get("source").size() == 3); + assertTrue(nodeToSourceTopics.get("source").contains("topic-1")); + assertTrue(nodeToSourceTopics.get("source").contains("topic-2")); + assertTrue(nodeToSourceTopics.get("source").contains("topic-3")); + + } + private void initPartitionGrouper(final StreamsConfig config, final StreamThread thread, final MockClientSupplier clientSupplier) {