Browse Source

KAFKA-9545: Fix subscription bugs from Stream refactoring (#8109)

This PR fixes two bugs related to stream refactoring:

1. The subscribed topics are not updated correctly when topic gets removed from broker.
2. The remainingPartitions computation doesn't account the case when one task has a pattern subscription of multiple topics. Then the input partition change will not be assumed as containsAll

The bugs are exposed from integration test testRegexMatchesTopicsAWhenDeleted and could be used to verify the fix works.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/8128/head
Boyang Chen 5 years ago committed by GitHub
parent
commit
97c5dc1c13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  3. 1
      streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java

26
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java

@ -1873,30 +1873,30 @@ public class InternalTopologyBuilder {
for (final TopicPartition topicPartition : partitions) { for (final TopicPartition topicPartition : partitions) {
assignedTopics.add(topicPartition.topic()); assignedTopics.add(topicPartition.topic());
} }
updateSubscribedTopics(assignedTopics, logPrefix);
final Collection<String> existingTopics = subscriptionUpdates();
if (!existingTopics.equals(assignedTopics)) {
assignedTopics.addAll(existingTopics);
updateSubscribedTopics(assignedTopics, logPrefix);
}
} }
} }
synchronized void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) { synchronized void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
if (usesPatternSubscription()) { if (usesPatternSubscription() && !subscriptionUpdates().equals(topics)) {
updateSubscribedTopics(topics, logPrefix); updateSubscribedTopics(topics, logPrefix);
} }
} }
private void updateSubscribedTopics(final Set<String> topics, final String logPrefix) { private void updateSubscribedTopics(final Set<String> topics, final String logPrefix) {
final Collection<String> existingTopics = subscriptionUpdates(); subscriptionUpdates.clear();
subscriptionUpdates.addAll(topics);
if (!existingTopics.equals(topics)) { log.debug("{}found {} topics possibly matching subscription", logPrefix, topics.size());
topics.addAll(existingTopics);
subscriptionUpdates.clear(); setRegexMatchedTopicsToSourceNodes();
subscriptionUpdates.addAll(topics); setRegexMatchedTopicToStateStore();
log.debug("{}found {} topics possibly matching subscription", logPrefix, topics.size());
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
}
} }
// following functions are for test only // following functions are for test only

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -266,8 +266,8 @@ public class TaskManager {
for (final Task task : tasks.values()) { for (final Task task : tasks.values()) {
if (remainingPartitions.containsAll(task.inputPartitions())) { if (remainingPartitions.containsAll(task.inputPartitions())) {
revokedTasks.add(task.id()); revokedTasks.add(task.id());
remainingPartitions.removeAll(task.inputPartitions());
} }
remainingPartitions.removeAll(task.inputPartitions());
} }
if (!remainingPartitions.isEmpty()) { if (!remainingPartitions.isEmpty()) {

1
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java

@ -211,7 +211,6 @@ public class RegexSourceIntegrationTest {
super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener));
} }
}; };
} }
}); });

Loading…
Cancel
Save