Browse Source

KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)

Reviewers: Chris Egerton <chrise@aiven.io>
pull/14220/head
Omnia G.H Ibrahim 1 year ago committed by GitHub
parent
commit
35e925f353
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
  2. 14
      connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
  3. 53
      connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
  4. 11
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java

20
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java

@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable @@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
public static final String SEPARATOR_DEFAULT = ".";
public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED;
public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true;
private String separator = SEPARATOR_DEFAULT;
private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT));
private boolean isInternalTopicSeparatorEnabled = true;
@Override
public void configure(Map<String, ?> props) {
@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable @@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
separator = (String) props.get(SEPARATOR_CONFIG);
log.info("Using custom remote topic separator: '{}'", separator);
separatorPattern = Pattern.compile(Pattern.quote(separator));
if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) {
isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString());
if (!isInternalTopicSeparatorEnabled) {
log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator);
}
}
}
}
@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable @@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
}
}
private String internalSeparator() {
return isInternalTopicSeparatorEnabled ? separator : ".";
}
private String internalSuffix() {
return separator + "internal";
return internalSeparator() + "internal";
}
private String checkpointsTopicSuffix() {
return separator + "checkpoints" + internalSuffix();
return internalSeparator() + "checkpoints" + internalSuffix();
}
@Override
public String offsetSyncsTopic(String clusterAlias) {
return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix();
return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix();
}
@Override

14
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java

@ -59,6 +59,14 @@ public class MirrorClientConfig extends AbstractConfig { @@ -59,6 +59,14 @@ public class MirrorClientConfig extends AbstractConfig {
public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
DefaultReplicationPolicy.SEPARATOR_DEFAULT;
public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled";
public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC =
"Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. " +
"By default, custom separators are used in these topic names; however, if upgrading MirrorMaker 2 from older versions " +
"that did not allow for these topic names to be customized, it may be necessary to set this property to 'false' in order " +
"to continue using the same names for those topics.";
public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT =
DefaultReplicationPolicy.INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT;
public static final String FORWARDING_ADMIN_CLASS = "forwarding.admin.class";
public static final String FORWARDING_ADMIN_CLASS_DOC = "Class which extends ForwardingAdmin to define custom cluster resource management (topics, configs, etc). " +
"The class must have a constructor with signature <code>(Map<String, Object> config)</code> that is used to configure a KafkaAdminClient and may also be used to configure clients for external systems if necessary.";
@ -144,6 +152,12 @@ public class MirrorClientConfig extends AbstractConfig { @@ -144,6 +152,12 @@ public class MirrorClientConfig extends AbstractConfig {
REPLICATION_POLICY_SEPARATOR_DEFAULT,
ConfigDef.Importance.LOW,
REPLICATION_POLICY_SEPARATOR_DOC)
.define(
INTERNAL_TOPIC_SEPARATOR_ENABLED,
ConfigDef.Type.BOOLEAN,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC)
.define(
FORWARDING_ADMIN_CLASS,
ConfigDef.Type.CLASS,

53
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java

@ -17,14 +17,25 @@ @@ -17,14 +17,25 @@
package org.apache.kafka.connect.mirror;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class ReplicationPolicyTest {
private static final DefaultReplicationPolicy DEFAULT_REPLICATION_POLICY = new DefaultReplicationPolicy();
@BeforeEach
public void setUp() {
DEFAULT_REPLICATION_POLICY.configure(Collections.emptyMap());
}
@Test
public void testInternalTopic() {
// starts with '__'
@ -39,4 +50,46 @@ public class ReplicationPolicyTest { @@ -39,4 +50,46 @@ public class ReplicationPolicyTest {
// non-internal topic.
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
}
@Test
public void offsetSyncsTopic_shouldBeEffectedByInternalTopicSeparatorEnabled() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, "__");
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, false);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("mm2-offset-syncs.CLUSTER.internal", DEFAULT_REPLICATION_POLICY.offsetSyncsTopic("CLUSTER"));
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, true);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("mm2-offset-syncs__CLUSTER__internal", DEFAULT_REPLICATION_POLICY.offsetSyncsTopic("CLUSTER"));
}
@Test
public void checkpointsTopic_shouldBeEffectedByInternalTopicSeparatorEnabled() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, "__");
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, false);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("CLUSTER.checkpoints.internal", DEFAULT_REPLICATION_POLICY.checkpointsTopic("CLUSTER"));
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, true);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("CLUSTER__checkpoints__internal", DEFAULT_REPLICATION_POLICY.checkpointsTopic("CLUSTER"));
}
@Test
public void heartbeatsTopic_shouldNotBeEffectedByInternalTopicSeparatorConfig() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, "__");
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, true);
assertEquals("heartbeats", DEFAULT_REPLICATION_POLICY.heartbeatsTopic());
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, false);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("heartbeats", DEFAULT_REPLICATION_POLICY.heartbeatsTopic());
}
}

11
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java

@ -84,6 +84,11 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { @@ -84,6 +84,11 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
MirrorClientConfig.REPLICATION_POLICY_SEPARATOR_DEFAULT;
private static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED;
private static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC;
public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT =
DefaultReplicationPolicy.INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT;
public static final String ADMIN_TASK_TIMEOUT_MILLIS = "admin.timeout.ms";
private static final String ADMIN_TASK_TIMEOUT_MILLIS_DOC = "Timeout for administrative tasks, e.g. detecting new topics.";
public static final long ADMIN_TASK_TIMEOUT_MILLIS_DEFAULT = 60000L;
@ -278,6 +283,12 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { @@ -278,6 +283,12 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
REPLICATION_POLICY_SEPARATOR_DEFAULT,
ConfigDef.Importance.LOW,
REPLICATION_POLICY_SEPARATOR_DOC)
.define(
INTERNAL_TOPIC_SEPARATOR_ENABLED,
ConfigDef.Type.BOOLEAN,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC)
.define(
FORWARDING_ADMIN_CLASS,
ConfigDef.Type.CLASS,

Loading…
Cancel
Save