Browse Source

KAFKA-9076: support consumer sync across clusters in MM 2.0 (#7577)

In order to make the Kafka consumer and stream application migrate from source to target cluster
transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed
to periodically sync the consumer offsets from the source to target cluster, so that when the
consumer and stream applications switche to the target cluster, they will resume to consume from
where they left off at source cluster.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ryanne Dolan <ryannedolan@gmail.com>, Thiago Pinto, Srinivas Boga
pull/8948/head
Ning Zhang 4 years ago committed by GitHub
parent
commit
9c9a79b459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 146
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
  2. 36
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
  3. 61
      connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
  4. 153
      connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java

146
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java

@ -16,6 +16,10 @@ @@ -16,6 +16,10 @@
*/
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
@ -28,6 +32,8 @@ import org.apache.kafka.clients.producer.RecordMetadata; @@ -28,6 +32,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
@ -42,7 +48,8 @@ public class MirrorCheckpointTask extends SourceTask { @@ -42,7 +48,8 @@ public class MirrorCheckpointTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class);
private AdminClient sourceAdminClient;
private Admin sourceAdminClient;
private Admin targetAdminClient;
private String sourceClusterAlias;
private String targetClusterAlias;
private String checkpointsTopic;
@ -54,16 +61,22 @@ public class MirrorCheckpointTask extends SourceTask { @@ -54,16 +61,22 @@ public class MirrorCheckpointTask extends SourceTask {
private OffsetSyncStore offsetSyncStore;
private boolean stopping;
private MirrorMetrics metrics;
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
private Map<String, List<Checkpoint>> checkpointsPerConsumerGroup;
public MirrorCheckpointTask() {}
// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore) {
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
Map<String, List<Checkpoint>> checkpointsPerConsumerGroup) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
}
@Override
@ -80,7 +93,15 @@ public class MirrorCheckpointTask extends SourceTask { @@ -80,7 +93,15 @@ public class MirrorCheckpointTask extends SourceTask {
pollTimeout = config.consumerPollTimeout();
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
targetAdminClient = AdminClient.create(config.targetAdminConfig());
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}
@Override
@ -94,7 +115,9 @@ public class MirrorCheckpointTask extends SourceTask { @@ -94,7 +115,9 @@ public class MirrorCheckpointTask extends SourceTask {
stopping = true;
Utils.closeQuietly(offsetSyncStore, "offset sync store");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
Utils.closeQuietly(metrics, "metrics");
Utils.closeQuietly(scheduler, "scheduler");
log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
}
@ -112,7 +135,7 @@ public class MirrorCheckpointTask extends SourceTask { @@ -112,7 +135,7 @@ public class MirrorCheckpointTask extends SourceTask {
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
records.addAll(checkpointsForGroup(group));
records.addAll(sourceRecordsForGroup(group));
}
if (records.isEmpty()) {
// WorkerSourceTask expects non-zero batches or null
@ -126,13 +149,13 @@ public class MirrorCheckpointTask extends SourceTask { @@ -126,13 +149,13 @@ public class MirrorCheckpointTask extends SourceTask {
}
}
private List<SourceRecord> checkpointsForGroup(String group) throws InterruptedException {
private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
try {
long timestamp = System.currentTimeMillis();
return listConsumerGroupOffsets(group).entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic()))
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately
List<Checkpoint> checkpoints = checkpointsForGroup(group);
checkpointsPerConsumerGroup.put(group, checkpoints);
return checkpoints.stream()
.map(x -> checkpointRecord(x, timestamp))
.collect(Collectors.toList());
} catch (ExecutionException e) {
@ -141,6 +164,14 @@ public class MirrorCheckpointTask extends SourceTask { @@ -141,6 +164,14 @@ public class MirrorCheckpointTask extends SourceTask {
}
}
private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
return listConsumerGroupOffsets(group).entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic()))
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately
.collect(Collectors.toList());
}
private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
throws InterruptedException, ExecutionException {
if (stopping) {
@ -189,4 +220,101 @@ public class MirrorCheckpointTask extends SourceTask { @@ -189,4 +220,101 @@ public class MirrorCheckpointTask extends SourceTask {
Checkpoint.unwrapGroup(record.sourcePartition()),
System.currentTimeMillis() - record.timestamp());
}
private void refreshIdleConsumerGroupOffset() {
Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient
.describeConsumerGroups(consumerGroups).describedGroups();
for (String group : consumerGroups) {
try {
ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get();
ConsumerGroupState consumerGroupState = consumerGroupDesc.state();
// sync offset to the target cluster only if the state of current consumer group is:
// (1) idle: because the consumer at target is not actively consuming the mirrored topic
// (2) dead: the new consumer that is recently created at source and never exist at target
if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group)
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
}
// new consumer upstream has state "DEAD" and will be identified during the offset sync-up
} catch (InterruptedException | ExecutionException e) {
log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e);
}
}
}
Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();
// first, sync offsets for the idle consumers at target
for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : getConvertedUpstreamOffset().entrySet()) {
String consumerGroupId = group.getKey();
// for each idle consumer at target, read the checkpoints (converted upstream offset)
// from the pre-populated map
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = group.getValue();
Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> targetConsumerOffset = idleConsumerGroupsOffset.get(consumerGroupId);
if (targetConsumerOffset == null) {
// this is a new consumer, just sync the offset to target
syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
offsetToSyncAll.put(consumerGroupId, convertedUpstreamOffset);
continue;
}
for (Entry<TopicPartition, OffsetAndMetadata> convertedEntry : convertedUpstreamOffset.entrySet()) {
TopicPartition topicPartition = convertedEntry.getKey();
OffsetAndMetadata convertedOffset = convertedUpstreamOffset.get(topicPartition);
if (!targetConsumerOffset.containsKey(topicPartition)) {
// if is a new topicPartition from upstream, just sync the offset to target
offsetToSync.put(topicPartition, convertedOffset);
continue;
}
// if translated offset from upstream is smaller than the current consumer offset
// in the target, skip updating the offset for that partition
long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset();
if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue;
}
offsetToSync.put(topicPartition, convertedOffset);
}
if (offsetToSync.size() == 0) {
log.trace("skip syncing the offset for consumer group: {}", consumerGroupId);
continue;
}
syncGroupOffset(consumerGroupId, offsetToSync);
offsetToSyncAll.put(consumerGroupId, offsetToSync);
}
idleConsumerGroupsOffset.clear();
return offsetToSyncAll;
}
void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
consumerGroupId, offsetToSync.size());
}
}
Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();
for (Entry<String, List<Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
}
return result;
}
}

36
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java

@ -25,6 +25,8 @@ import org.apache.kafka.common.metrics.JmxReporter; @@ -25,6 +25,8 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import java.util.Map;
import java.util.HashMap;
@ -69,6 +71,7 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -69,6 +71,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
protected static final String SYNC_TOPIC_ACLS = "sync.topic.acls";
protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
protected static final String EMIT_CHECKPOINTS = "emit.checkpoints";
protected static final String SYNC_GROUP_OFFSETS = "sync.group.offsets";
public static final String ENABLED = "enabled";
private static final String ENABLED_DOC = "Whether to replicate source->target.";
@ -171,6 +174,14 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -171,6 +174,14 @@ public class MirrorConnectorConfig extends AbstractConfig {
private static final String EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC = "Frequency of checkpoints.";
public static final long EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT = 60;
public static final String SYNC_GROUP_OFFSETS_ENABLED = SYNC_GROUP_OFFSETS + ENABLED_SUFFIX;
private static final String SYNC_GROUP_OFFSETS_ENABLED_DOC = "Whether to periodically write the translated offsets to __consumer_offsets topic in target cluster, as long as no active consumers in that group are connected to the target cluster";
public static final boolean SYNC_GROUP_OFFSETS_ENABLED_DEFAULT = false;
public static final String SYNC_GROUP_OFFSETS_INTERVAL_SECONDS = SYNC_GROUP_OFFSETS + INTERVAL_SECONDS_SUFFIX;
private static final String SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC = "Frequency of consumer group offset sync.";
public static final long SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT = 60;
public static final String TOPIC_FILTER_CLASS = "topic.filter.class";
private static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
@ -231,8 +242,8 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -231,8 +242,8 @@ public class MirrorConnectorConfig extends AbstractConfig {
props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@ -402,6 +413,15 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -402,6 +413,15 @@ public class MirrorConnectorConfig extends AbstractConfig {
return getConfiguredInstance(CONFIG_PROPERTY_FILTER_CLASS, ConfigPropertyFilter.class);
}
Duration syncGroupOffsetsInterval() {
if (getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
return Duration.ofSeconds(getLong(SYNC_GROUP_OFFSETS_INTERVAL_SECONDS));
} else {
// negative interval to disable
return Duration.ofMillis(-1);
}
}
protected static final ConfigDef CONNECTOR_CONFIG_DEF = ConnectorConfig.configDef()
.define(
ENABLED,
@ -552,6 +572,18 @@ public class MirrorConnectorConfig extends AbstractConfig { @@ -552,6 +572,18 @@ public class MirrorConnectorConfig extends AbstractConfig {
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC)
.define(
SYNC_GROUP_OFFSETS_ENABLED,
ConfigDef.Type.BOOLEAN,
SYNC_GROUP_OFFSETS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_ENABLED_DOC)
.define(
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC)
.define(
REPLICATION_POLICY_CLASS,
ConfigDef.Type.CLASS,

61
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java

@ -16,6 +16,11 @@ @@ -16,6 +16,11 @@
*/
package org.apache.kafka.connect.mirror;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.connect.source.SourceRecord;
@ -29,7 +34,7 @@ public class MirrorCheckpointTaskTest { @@ -29,7 +34,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testDownstreamTopicRenaming() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null);
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
assertEquals(new TopicPartition("topic3", 5),
@ -42,7 +47,7 @@ public class MirrorCheckpointTaskTest { @@ -42,7 +47,7 @@ public class MirrorCheckpointTaskTest {
public void testCheckpoint() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore);
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
@ -64,4 +69,56 @@ public class MirrorCheckpointTaskTest { @@ -64,4 +69,56 @@ public class MirrorCheckpointTaskTest {
assertEquals(13, checkpoint2.downstreamOffset());
assertEquals(234L, sourceRecord2.timestamp().longValue());
}
@Test
public void testSyncOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
String consumer1 = "consumer1";
String consumer2 = "consumer2";
String topic1 = "topic1";
String topic2 = "topic2";
// 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1
Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
// 't1p0' denotes topic1, partition 0
TopicPartition t1p0 = new TopicPartition(topic1, 0);
c1t1.put(t1p0, new OffsetAndMetadata(100));
Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>();
TopicPartition t2p0 = new TopicPartition(topic2, 0);
c2t2.put(t2p0, new OffsetAndMetadata(50));
idleConsumerGroupsOffset.put(consumer1, c1t1);
idleConsumerGroupsOffset.put(consumer2, c2t2);
// 'cpC1T1P0' denotes 'checkpoint' of topic1, partition 0 for consumer1
Checkpoint cpC1T1P0 = new Checkpoint(consumer1, new TopicPartition(topic1, 0), 200, 101, "metadata");
// 'cpC2T2p0' denotes 'checkpoint' of topic2, partition 0 for consumer2
Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100, 51, "metadata");
// 'checkpointListC1' denotes 'checkpoint' list for consumer1
List<Checkpoint> checkpointListC1 = new ArrayList<>();
checkpointListC1.add(cpC1T1P0);
// 'checkpointListC2' denotes 'checkpoint' list for consumer2
List<Checkpoint> checkpointListC2 = new ArrayList<>();
checkpointListC2.add(cpC2T2P0);
checkpointsPerConsumerGroup.put(consumer1, checkpointListC1);
checkpointsPerConsumerGroup.put(consumer2, checkpointListC2);
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);
Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();
assertEquals(101, output.get(consumer1).get(t1p0).offset());
assertEquals(51, output.get(consumer2).get(t2p0).offset());
}
}

153
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java

@ -18,8 +18,10 @@ package org.apache.kafka.connect.mirror; @@ -18,8 +18,10 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
@ -61,16 +63,17 @@ public class MirrorConnectorsIntegrationTest { @@ -61,16 +63,17 @@ public class MirrorConnectorsIntegrationTest {
private static final int RECORD_TRANSFER_DURATION_MS = 10_000;
private static final int CHECKPOINT_DURATION_MS = 20_000;
private Time time = Time.SYSTEM;
private Map<String, String> mm2Props;
private MirrorMakerConfig mm2Config;
private EmbeddedConnectCluster primary;
private EmbeddedConnectCluster backup;
@Before
public void setup() throws InterruptedException {
Properties brokerProps = new Properties();
brokerProps.put("auto.create.topics.enable", "false");
Map<String, String> mm2Props = new HashMap<>();
mm2Props = new HashMap<>();
mm2Props.put("clusters", "primary, backup");
mm2Props.put("max.tasks", "10");
mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
@ -130,19 +133,6 @@ public class MirrorConnectorsIntegrationTest { @@ -130,19 +133,6 @@ public class MirrorConnectorsIntegrationTest {
backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
}
// create consumers before starting the connectors so we don't need to wait for discovery
Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1");
consumer1.poll(Duration.ofMillis(500));
consumer1.commitSync();
consumer1.close();
Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1");
consumer2.poll(Duration.ofMillis(500));
consumer2.commitSync();
consumer2.close();
log.info("primary REST service: {}", primary.endpointForResource("connectors"));
log.info("backup REST service: {}", backup.endpointForResource("connectors"));
@ -152,42 +142,29 @@ public class MirrorConnectorsIntegrationTest { @@ -152,42 +142,29 @@ public class MirrorConnectorsIntegrationTest {
// now that the brokers are running, we can finish setting up the Connectors
mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
mm2Config = new MirrorMakerConfig(mm2Props);
// we wait for the connector and tasks to come up for each connector, so that when we do the
// actual testing, we are certain that the tasks are up and running; this will prevent
// flaky tests where the connector and tasks didn't start up in time for the tests to be
// run
Set<String> connectorNames = new HashSet<>(Arrays.asList("MirrorSourceConnector",
"MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
backup.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
MirrorSourceConnector.class));
backup.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
MirrorCheckpointConnector.class));
backup.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
MirrorHeartbeatConnector.class));
}
waitUntilMirrorMakerIsRunning(backup, connectorNames);
primary.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
MirrorSourceConnector.class));
private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
MirrorMakerConfig mm2Config, String primary, String backup) throws InterruptedException {
primary.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
MirrorCheckpointConnector.class));
connectCluster.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup),
MirrorSourceConnector.class));
primary.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
MirrorHeartbeatConnector.class));
connectCluster.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup),
MirrorCheckpointConnector.class));
waitUntilMirrorMakerIsRunning(primary, connectorNames);
}
connectCluster.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup),
MirrorHeartbeatConnector.class));
// we wait for the connector and tasks to come up for each connector, so that when we do the
// actual testing, we are certain that the tasks are up and running; this will prevent
// flaky tests where the connector and tasks didn't start up in time for the tests to be
// run
Set<String> connectorNames = new HashSet<>(Arrays.asList("MirrorSourceConnector",
"MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
Set<String> connNames) throws InterruptedException {
for (String connector : connNames) {
for (String connector : connectorNames) {
connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1,
"Connector " + connector + " tasks did not start in time on cluster: " + connectCluster);
}
@ -207,8 +184,30 @@ public class MirrorConnectorsIntegrationTest { @@ -207,8 +184,30 @@ public class MirrorConnectorsIntegrationTest {
backup.stop();
}
@Test
public void testReplication() throws InterruptedException {
// create consumers before starting the connectors so we don't need to wait for discovery
Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1");
consumer1.poll(Duration.ofMillis(500));
consumer1.commitSync();
consumer1.close();
Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1");
consumer2.poll(Duration.ofMillis(500));
consumer2.commitSync();
consumer2.close();
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary");
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
@ -246,7 +245,7 @@ public class MirrorConnectorsIntegrationTest { @@ -246,7 +245,7 @@ public class MirrorConnectorsIntegrationTest {
new TopicPartition("primary.test-topic-1", 0)));
// Failover consumer group to backup cluster.
Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
consumer1.assign(backupOffsets.keySet());
backupOffsets.forEach(consumer1::seek);
consumer1.poll(Duration.ofMillis(500));
@ -282,7 +281,7 @@ public class MirrorConnectorsIntegrationTest { @@ -282,7 +281,7 @@ public class MirrorConnectorsIntegrationTest {
Duration.ofMillis(CHECKPOINT_DURATION_MS));
// Failback consumer group to primary cluster
Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
consumer2.assign(primaryOffsets.keySet());
primaryOffsets.forEach(consumer2::seek);
consumer2.poll(Duration.ofMillis(500));
@ -316,6 +315,68 @@ public class MirrorConnectorsIntegrationTest { @@ -316,6 +315,68 @@ public class MirrorConnectorsIntegrationTest {
backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
}
@Test
public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedException {
// create consumers before starting the connectors so we don't need to wait for discovery
Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-1");
consumer1.poll(Duration.ofMillis(500));
consumer1.commitSync();
consumer1.close();
// enable automated consumer group offset sync
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
// one way replication from primary to backup
mm2Props.put("backup->primary.enabled", "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
// sleep 5 seconds to ensure the automated group offset sync is complete
time.sleep(5000);
// create a consumer at backup cluster with same consumer group Id to consume 1 topic
Consumer<byte[], byte[]> consumer = backup.kafka().createConsumerAndSubscribeTo(
Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1");
ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
// the size of consumer record should be zero, because the offsets of the same consumer group
// have been automatically synchronized from primary to backup by the background job, so no
// more records to consume from the replicated topic by the same consumer group at backup cluster
assertEquals("consumer record size is not zero", 0, records.count());
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
backup.kafka().createTopic("primary.test-topic-2", 1);
// produce some records to the new topic in primary cluster
for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key", "message-1-" + i);
}
// create a consumer at primary cluster to consume the new topic
consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-2");
consumer1.poll(Duration.ofMillis(500));
consumer1.commitSync();
consumer1.close();
// sleep 5 seconds to ensure the automated group offset sync is complete
time.sleep(5000);
// create a consumer at backup cluster with same consumer group Id to consume old and new topic
consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2");
records = consumer.poll(Duration.ofMillis(500));
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster
assertEquals("consumer record size is not zero", 0, records.count());
consumer.close();
}
private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
Admin client = cluster.createAdminClient();
try {

Loading…
Cancel
Save