|
|
|
@ -213,11 +213,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
@@ -213,11 +213,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
|
|
|
|
|
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) |
|
|
|
|
val customRebalanceListener = { |
|
|
|
|
if (customRebalanceListenerClass != null) |
|
|
|
|
Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass) |
|
|
|
|
Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) |
|
|
|
|
else |
|
|
|
|
null |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) |
|
|
|
|
consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener) |
|
|
|
|
connector.setConsumerRebalanceListener(consumerRebalanceListener) |
|
|
|
|
|
|
|
|
|
// create producer threads |
|
|
|
|