Browse Source

KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention - KIP-690 (#11220)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
pull/11396/head
Omnia G H Ibrahim 3 years ago committed by GitHub
parent
commit
400d39bb0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
  2. 2
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
  3. 12
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
  4. 3
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
  5. 34
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
  6. 9
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
  7. 7
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java

28
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java

@ -70,4 +70,32 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable @@ -70,4 +70,32 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
return topic.substring(source.length() + separator.length());
}
}
private String internalSuffix() {
return separator + "internal";
}
private String checkpointsTopicSuffix() {
return separator + "checkpoints" + internalSuffix();
}
@Override
public String offsetSyncsTopic(String clusterAlias) {
return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix();
}
@Override
public String checkpointsTopic(String clusterAlias) {
return clusterAlias + checkpointsTopicSuffix();
}
@Override
public boolean isCheckpointsTopic(String topic) {
return topic.endsWith(checkpointsTopicSuffix());
}
@Override
public boolean isMM2InternalTopic(String topic) {
return topic.endsWith(internalSuffix());
}
}

2
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java

@ -87,6 +87,6 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { @@ -87,6 +87,6 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
private boolean looksLikeHeartbeat(String topic) {
return topic != null && topic.endsWith(MirrorClientConfig.HEARTBEATS_TOPIC);
return topic != null && topic.endsWith(heartbeatsTopic());
}
}

12
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java

@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.SchemaException; @@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Set;
import java.util.HashSet;
@ -156,7 +156,7 @@ public class MirrorClient implements AutoCloseable { @@ -156,7 +156,7 @@ public class MirrorClient implements AutoCloseable {
try {
// checkpoint topics are not "remote topics", as they are not replicated. So we don't need
// to use ReplicationPolicy to create the checkpoint topic here.
String checkpointTopic = remoteClusterAlias + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
String checkpointTopic = replicationPolicy.checkpointsTopic(remoteClusterAlias);
List<TopicPartition> checkpointAssignment =
Collections.singletonList(new TopicPartition(checkpointTopic, 0));
consumer.assign(checkpointAssignment);
@ -209,17 +209,15 @@ public class MirrorClient implements AutoCloseable { @@ -209,17 +209,15 @@ public class MirrorClient implements AutoCloseable {
}
visited.add(source);
topic = replicationPolicy.upstreamTopic(topic);
}
}
}
boolean isHeartbeatTopic(String topic) {
// heartbeats are replicated, so we must use ReplicationPolicy here
return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
return replicationPolicy.isHeartbeatsTopic(topic);
}
boolean isCheckpointTopic(String topic) {
// checkpoints are not replicated, so we don't need to use ReplicationPolicy here
return topic.endsWith(MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX);
return replicationPolicy.isCheckpointsTopic(topic);
}
boolean isRemoteTopic(String topic) {

3
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java

@ -57,9 +57,6 @@ public class MirrorClientConfig extends AbstractConfig { @@ -57,9 +57,6 @@ public class MirrorClientConfig extends AbstractConfig {
public static final String CONSUMER_CLIENT_PREFIX = "consumer.";
public static final String PRODUCER_CLIENT_PREFIX = "producer.";
static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal"; // internal so not replicated
static final String HEARTBEATS_TOPIC = "heartbeats";
MirrorClientConfig(Map<?, ?> props) {
super(CONFIG_DEF, props, true);
}

34
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java

@ -52,9 +52,39 @@ public interface ReplicationPolicy { @@ -52,9 +52,39 @@ public interface ReplicationPolicy {
}
}
/** Returns heartbeats topic name.*/
default String heartbeatsTopic() {
return "heartbeats";
}
/** Returns the offset-syncs topic for given cluster alias. */
default String offsetSyncsTopic(String clusterAlias) {
return "mm2-offset-syncs." + clusterAlias + ".internal";
}
/** Returns the name checkpoint topic for given cluster alias. */
default String checkpointsTopic(String clusterAlias) {
return clusterAlias + ".checkpoints.internal";
}
/** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */
default boolean isHeartbeatsTopic(String topic) {
return heartbeatsTopic().equals(originalTopic(topic));
}
/** check if topic is a checkpoint topic. */
default boolean isCheckpointsTopic(String topic) {
return topic.endsWith(".checkpoints.internal");
}
/** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
default boolean isMM2InternalTopic(String topic) {
return topic.endsWith(".internal");
}
/** Internal topics are never replicated. */
default boolean isInternalTopic(String topic) {
return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
|| topic.startsWith(".");
boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") || topic.endsWith("-internal");
return isMM2InternalTopic(topic) || isKafkaInternalTopic;
}
}

9
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java

@ -343,8 +343,7 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -343,8 +343,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
String otherClusterAlias = SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? targetClusterAlias()
: sourceClusterAlias();
// ".internal" suffix ensures this doesn't get replicated
return "mm2-offset-syncs." + otherClusterAlias + ".internal";
return replicationPolicy().offsetSyncsTopic(otherClusterAlias);
}
String offsetSyncsTopicLocation() {
@ -370,7 +369,7 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -370,7 +369,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
String heartbeatsTopic() {
return MirrorClientConfig.HEARTBEATS_TOPIC;
return replicationPolicy().heartbeatsTopic();
}
// e.g. source1.heartbeats
@ -379,9 +378,7 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -379,9 +378,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
String checkpointsTopic() {
// Checkpoint topics are not "remote topics", as they are not replicated, so we don't
// need to use ReplicationPolicy here.
return sourceClusterAlias() + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
return replicationPolicy().checkpointsTopic(sourceClusterAlias());
}
long maxOffsetLag() {

7
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java

@ -471,7 +471,7 @@ public class MirrorSourceConnector extends SourceConnector { @@ -471,7 +471,7 @@ public class MirrorSourceConnector extends SourceConnector {
}
boolean shouldReplicateTopic(String topic) {
return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic))
return (topicFilter.shouldReplicateTopic(topic) || replicationPolicy.isHeartbeatsTopic(topic))
&& !replicationPolicy.isInternalTopic(topic) && !isCycle(topic);
}
@ -501,11 +501,6 @@ public class MirrorSourceConnector extends SourceConnector { @@ -501,11 +501,6 @@ public class MirrorSourceConnector extends SourceConnector {
}
}
// e.g. heartbeats, us-west.heartbeats
boolean isHeartbeatTopic(String topic) {
return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
}
String formatRemoteTopic(String topic) {
return replicationPolicy.formatRemoteTopic(sourceAndTarget.source(), topic);
}

Loading…
Cancel
Save