Browse Source

KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs right

pull/51/head
Jiangjie Qin 10 years ago committed by Guozhang Wang
parent
commit
82789e7519
  1. 17
      core/src/main/scala/kafka/tools/MirrorMaker.scala

17
core/src/main/scala/kafka/tools/MirrorMaker.scala

@ -33,7 +33,7 @@ import kafka.metrics.KafkaMetricsGroup @@ -33,7 +33,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, Logging, Utils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata}
/**
* The mirror maker has the following architecture:
@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM @@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM
* acks=all
* retries=max integer
* block.on.buffer.full=true
* max.in.flight.requests.per.connection=1
* 2. Consumer Settings
* auto.commit.enable=false
* 3. Mirror Maker Setting:
@ -57,7 +58,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -57,7 +58,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages unacked for consumer rebalance
// Track the messages not successfully sent by mirror maker.
private var numDroppedMessages: AtomicInteger = new AtomicInteger(0)
private var messageHandler: MirrorMakerMessageHandler = null
private var offsetCommitIntervalMs = 0
@ -83,7 +84,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -83,7 +84,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.describedAs("config file")
.ofType(classOf[String])
// Please see note about MaxInflightRequests
val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
.withRequiredArg()
@ -179,9 +179,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -179,9 +179,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// create producer
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
// Defaults to no data loss settings.
maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString)
maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true")
maybeSetDefaultProperty(producerProps, "acks", "all")
maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
producer = new MirrorMakerProducer(producerProps)
// Create consumer connector
@ -253,7 +254,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -253,7 +254,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
messageHandler = {
if (customMessageHandlerClass != null) {
if (messageHandlerArgs != null)
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs)
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
else
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
} else {
@ -409,11 +410,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -409,11 +410,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
if (abortOnSendFailure)
exitingOnSendFailure = true
numDroppedMessages.incrementAndGet()
}
}
}
private class InternalRebalanceListener(connector: ZookeeperConsumerConnector,
customRebalanceListener: Option[ConsumerRebalanceListener])
extends ConsumerRebalanceListener {

Loading…
Cancel
Save