From 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 18 Mar 2015 14:58:11 -0700 Subject: [PATCH] KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs right --- .../main/scala/kafka/tools/MirrorMaker.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 11acc3103e4..4f3c4c872e1 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -33,7 +33,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, Logging, Utils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata} /** * The mirror maker has the following architecture: @@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM * acks=all * retries=max integer * block.on.buffer.full=true + * max.in.flight.requests.per.connection=1 * 2. Consumer Settings * auto.commit.enable=false * 3. Mirror Maker Setting: @@ -57,7 +58,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) - // Track the messages unacked for consumer rebalance + // Track the messages not successfully sent by mirror maker. private var numDroppedMessages: AtomicInteger = new AtomicInteger(0) private var messageHandler: MirrorMakerMessageHandler = null private var offsetCommitIntervalMs = 0 @@ -83,7 +84,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("config file") .ofType(classOf[String]) - // Please see note about MaxInflightRequests val producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.") .withRequiredArg() @@ -179,9 +179,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // Defaults to no data loss settings. - maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString) - maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true") - maybeSetDefaultProperty(producerProps, "acks", "all") + maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString) + maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all") + maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") producer = new MirrorMakerProducer(producerProps) // Create consumer connector @@ -253,7 +254,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { messageHandler = { if (customMessageHandlerClass != null) { if (messageHandlerArgs != null) - Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs) + Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) else Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { @@ -409,11 +410,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on. if (abortOnSendFailure) exitingOnSendFailure = true + numDroppedMessages.incrementAndGet() } } } - private class InternalRebalanceListener(connector: ZookeeperConsumerConnector, customRebalanceListener: Option[ConsumerRebalanceListener]) extends ConsumerRebalanceListener {