diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index e3d197eb120..09cf9015992 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1873,30 +1873,30 @@ public class InternalTopologyBuilder { for (final TopicPartition topicPartition : partitions) { assignedTopics.add(topicPartition.topic()); } - updateSubscribedTopics(assignedTopics, logPrefix); + + final Collection existingTopics = subscriptionUpdates(); + + if (!existingTopics.equals(assignedTopics)) { + assignedTopics.addAll(existingTopics); + updateSubscribedTopics(assignedTopics, logPrefix); + } } } synchronized void addSubscribedTopicsFromMetadata(final Set topics, final String logPrefix) { - if (usesPatternSubscription()) { + if (usesPatternSubscription() && !subscriptionUpdates().equals(topics)) { updateSubscribedTopics(topics, logPrefix); } } private void updateSubscribedTopics(final Set topics, final String logPrefix) { - final Collection existingTopics = subscriptionUpdates(); + subscriptionUpdates.clear(); + subscriptionUpdates.addAll(topics); - if (!existingTopics.equals(topics)) { - topics.addAll(existingTopics); + log.debug("{}found {} topics possibly matching subscription", logPrefix, topics.size()); - subscriptionUpdates.clear(); - subscriptionUpdates.addAll(topics); - - log.debug("{}found {} topics possibly matching subscription", logPrefix, topics.size()); - - setRegexMatchedTopicsToSourceNodes(); - setRegexMatchedTopicToStateStore(); - } + setRegexMatchedTopicsToSourceNodes(); + setRegexMatchedTopicToStateStore(); } // following functions are for test only diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 49b673ba98a..30226a97cbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -266,8 +266,8 @@ public class TaskManager { for (final Task task : tasks.values()) { if (remainingPartitions.containsAll(task.inputPartitions())) { revokedTasks.add(task.id()); - remainingPartitions.removeAll(task.inputPartitions()); } + remainingPartitions.removeAll(task.inputPartitions()); } if (!remainingPartitions.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 49efb2dbdb3..cddc53189c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -211,7 +211,6 @@ public class RegexSourceIntegrationTest { super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); } }; - } });