diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index a11dbc8def1..3ee84ed9cbd 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -213,11 +213,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) val customRebalanceListener = { if (customRebalanceListenerClass != null) - Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass) + Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) else - null + None } - consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) + consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener) connector.setConsumerRebalanceListener(consumerRebalanceListener) // create producer threads