Browse Source

Fixing KAFKA-10094 (#8797)

pull/8823/head
Mandar Tillu 5 years ago committed by GitHub
parent
commit
bdae26d047
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java

6
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java

@ -217,11 +217,9 @@ public class MirrorSourceConnector extends SourceConnector { @@ -217,11 +217,9 @@ public class MirrorSourceConnector extends SourceConnector {
.map(x -> new TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
.collect(Collectors.toList());
Set<TopicPartition> newTopicPartitions = new HashSet<>();
newTopicPartitions.addAll(knownSourceTopicPartitions);
Set<TopicPartition> newTopicPartitions = new HashSet<>(knownSourceTopicPartitions);
newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
Set<TopicPartition> deadTopicPartitions = new HashSet<>();
deadTopicPartitions.addAll(upstreamTargetTopicPartitions);
Set<TopicPartition> deadTopicPartitions = new HashSet<>(upstreamTargetTopicPartitions);
deadTopicPartitions.removeAll(knownSourceTopicPartitions);
if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
log.info("Found {} topic-partitions on {}. {} are new. {} were removed. Previously had {}.",

Loading…
Cancel
Save