You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
223 lines
7.4 KiB
223 lines
7.4 KiB
5 years ago
|
|
||
|
# MirrorMaker 2.0
|
||
|
|
||
|
MM2 leverages the Connect framework to replicate topics between Kafka
|
||
|
clusters. MM2 includes several new features, including:
|
||
|
|
||
|
- both topics and consumer groups are replicated
|
||
|
- topic configuration and ACLs are replicated
|
||
|
- cross-cluster offsets are synchronized
|
||
|
- partitioning is preserved
|
||
|
|
||
|
## Replication flows
|
||
|
|
||
|
MM2 replicates topics and consumer groups from upstream source clusters
|
||
|
to downstream target clusters. These directional flows are notated
|
||
|
`A->B`.
|
||
|
|
||
|
It's possible to create complex replication topologies based on these
|
||
|
`source->target` flows, including:
|
||
|
|
||
|
- *fan-out*, e.g. `K->A, K->B, K->C`
|
||
|
- *aggregation*, e.g. `A->K, B->K, C->K`
|
||
|
- *active/active*, e.g. `A->B, B->A`
|
||
|
|
||
|
Each replication flow can be configured independently, e.g. to replicate
|
||
|
specific topics or groups:
|
||
|
|
||
|
A->B.topics = topic-1, topic-2
|
||
|
A->B.groups = group-1, group-2
|
||
|
|
||
|
By default, all topics and consumer groups are replicated (except
|
||
|
blacklisted ones), across all enabled replication flows. Each
|
||
|
replication flow must be explicitly enabled to begin replication:
|
||
|
|
||
|
A->B.enabled = true
|
||
|
B->A.enabled = true
|
||
|
|
||
|
## Starting an MM2 process
|
||
|
|
||
|
You can run any number of MM2 processes as needed. Any MM2 processes
|
||
|
which are configured to replicate the same Kafka clusters will find each
|
||
|
other, share configuration, load balance, etc.
|
||
|
|
||
|
To start an MM2 process, first specify Kafka cluster information in a
|
||
|
configuration file as follows:
|
||
|
|
||
|
# mm2.properties
|
||
|
clusters = us-west, us-east
|
||
|
us-west.bootstrap.servers = host1:9092
|
||
|
us-east.bootstrap.servers = host2:9092
|
||
|
|
||
|
You can list any number of clusters this way.
|
||
|
|
||
|
Optionally, you can override default MirrorMaker properties:
|
||
|
|
||
|
topics = .*
|
||
|
groups = group1, group2
|
||
|
emit.checkpoints.interval.seconds = 10
|
||
|
|
||
|
These will apply to all replication flows. You can also override default
|
||
|
properties for specific clusters or replication flows:
|
||
|
|
||
|
# configure a specific cluster
|
||
|
us-west.offset.storage.topic = mm2-offsets
|
||
|
|
||
|
# configure a specific source->target replication flow
|
||
|
us-west->us-east.emit.heartbeats = false
|
||
|
|
||
|
Next, enable individual replication flows as follows:
|
||
|
|
||
|
us-west->us-east.enabled = true # disabled by default
|
||
|
|
||
|
Finally, launch one or more MirrorMaker processes with the `connect-mirror-maker.sh`
|
||
|
script:
|
||
|
|
||
|
$ ./bin/connect-mirror-maker.sh mm2.properties
|
||
|
|
||
|
## Multicluster environments
|
||
|
|
||
|
MM2 supports replication between multiple Kafka clusters, whether in the
|
||
|
same data center or across multiple data centers. A single MM2 cluster
|
||
|
can span multiple data centers, but it is recommended to keep MM2's producers
|
||
|
as close as possible to their target clusters. To do so, specify a subset
|
||
|
of clusters for each MM2 node as follows:
|
||
|
|
||
|
# in west DC:
|
||
|
$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
|
||
|
|
||
|
This signals to the node that the given clusters are nearby, and prevents the
|
||
|
node from sending records or configuration to clusters in other data centers.
|
||
|
|
||
|
### Example
|
||
|
|
||
|
Say there are three data centers (west, east, north) with two Kafka
|
||
|
clusters in each data center (west-1, west-2 etc). We can configure MM2
|
||
|
for active/active replication within each data center, as well as cross data
|
||
|
center replication (XDCR) as follows:
|
||
|
|
||
|
# mm2.properties
|
||
|
clusters: west-1, west-2, east-1, east-2, north-1, north-2
|
||
|
|
||
|
west-1.bootstrap.servers = ...
|
||
|
---%<---
|
||
|
|
||
|
# active/active in west
|
||
|
west-1->west-2.enabled = true
|
||
|
west-2->west-1.enabled = true
|
||
|
|
||
|
# active/active in east
|
||
|
east-1->east-2.enabled = true
|
||
|
east-2->east-1.enabled = true
|
||
|
|
||
|
# active/active in north
|
||
|
north-1->north-2.enabled = true
|
||
|
north-2->north-1.enabled = true
|
||
|
|
||
|
# XDCR via west-1, east-1, north-1
|
||
|
west-1->east-1.enabled = true
|
||
|
west-1->north-1.enabled = true
|
||
|
east-1->west-1.enabled = true
|
||
|
east-1->north-1.enabled = true
|
||
|
north-1->west-1.enabled = true
|
||
|
north-1->east-1.enabled = true
|
||
|
|
||
|
Then, launch MM2 in each data center as follows:
|
||
|
|
||
|
# in west:
|
||
|
$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
|
||
|
|
||
|
# in east:
|
||
|
$ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2
|
||
|
|
||
|
# in north:
|
||
|
$ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2
|
||
|
|
||
|
With this configuration, records produced to any cluster will be replicated
|
||
|
within the data center, as well as across to other data centers. By providing
|
||
|
the `--clusters` parameter, we ensure that each node only produces records to
|
||
|
nearby clusters.
|
||
|
|
||
|
N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between
|
||
|
data centers, and you may incur unnecessary data transfer costs.
|
||
|
|
||
|
## Shared configuration
|
||
|
|
||
|
MM2 processes share configuration via their target Kafka clusters.
|
||
|
For example, the following two processes would be racy:
|
||
|
|
||
|
# process1:
|
||
|
A->B.enabled = true
|
||
|
A->B.topics = foo
|
||
|
|
||
|
# process2:
|
||
|
A->B.enabled = true
|
||
|
A->B.topics = bar
|
||
|
|
||
|
In this case, the two processes will share configuration via cluster `B`.
|
||
|
Depending on which processes is elected "leader", the result will be
|
||
|
that either `foo` or `bar` is replicated -- but not both. For this reason,
|
||
|
it is important to keep configuration consistent across flows to the same
|
||
|
target cluster. In most cases, your entire organization should use a single
|
||
|
MM2 configuration file.
|
||
|
|
||
|
## Remote topics
|
||
|
|
||
|
MM2 employs a naming convention to ensure that records from different
|
||
|
clusters are not written to the same partition. By default, replicated
|
||
|
topics are renamed based on "source cluster aliases":
|
||
|
|
||
|
topic-1 --> source.topic-1
|
||
|
|
||
|
This can be customized by overriding the `replication.policy.separator`
|
||
|
property (default is a period). If you need more control over how
|
||
|
remote topics are defined, you can implement a custom `ReplicationPolicy`
|
||
|
and override `replication.policy.class` (default is
|
||
|
`DefaultReplicationPolicy`).
|
||
|
|
||
|
## Monitoring an MM2 process
|
||
|
|
||
|
MM2 is built on the Connect framework and inherits all of Connect's metrics, e.g.
|
||
|
`source-record-poll-rate`. In addition, MM2 produces its own metrics under the
|
||
|
`kafka.connect.mirror` metric group. Metrics are tagged with the following properties:
|
||
|
|
||
|
- *target*: alias of target cluster
|
||
|
- *source*: alias of source cluster
|
||
|
- *topic*: remote topic on target cluster
|
||
|
- *partition*: partition being replicated
|
||
|
|
||
|
Metrics are tracked for each *remote* topic. The source cluster can be inferred
|
||
|
from the topic name. For example, replicating `topic1` from `A->B` will yield metrics
|
||
|
like:
|
||
|
|
||
|
- `target=B`
|
||
|
- `topic=A.topic1`
|
||
|
- `partition=1`
|
||
|
|
||
|
The following metrics are emitted:
|
||
|
|
||
|
# MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)
|
||
|
|
||
|
record-count # number of records replicated source -> target
|
||
|
record-age-ms # age of records when they are replicated
|
||
|
record-age-ms-min
|
||
|
record-age-ms-max
|
||
|
record-age-ms-avg
|
||
|
replication-latecny-ms # time it takes records to propagate source->target
|
||
|
replication-latency-ms-min
|
||
|
replication-latency-ms-max
|
||
|
replication-latency-ms-avg
|
||
|
byte-rate # average number of bytes/sec in replicated records
|
||
|
|
||
|
|
||
|
# MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)
|
||
|
|
||
|
checkpoint-latency-ms # time it takes to replicate consumer offsets
|
||
|
checkpoint-latency-ms-min
|
||
|
checkpoint-latency-ms-max
|
||
|
checkpoint-latency-ms-avg
|
||
|
|
||
|
These metrics do not discern between created-at and log-append timestamps.
|
||
|
|
||
|
|