@ -225,6 +225,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// create producer
// create producer
val producerProps = Utils . loadProps ( options . valueOf ( producerConfigOpt ) )
val producerProps = Utils . loadProps ( options . valueOf ( producerConfigOpt ) )
val sync = producerProps . getProperty ( "producer.type" , "async" ) . equals ( "sync" )
producerProps . remove ( "producer.type" )
// Defaults to no data loss settings .
// Defaults to no data loss settings .
maybeSetDefaultProperty ( producerProps , ProducerConfig . RETRIES_CONFIG , Int . MaxValue . toString )
maybeSetDefaultProperty ( producerProps , ProducerConfig . RETRIES_CONFIG , Int . MaxValue . toString )
maybeSetDefaultProperty ( producerProps , ProducerConfig . MAX_BLOCK_MS_CONFIG , Long . MaxValue . toString )
maybeSetDefaultProperty ( producerProps , ProducerConfig . MAX_BLOCK_MS_CONFIG , Long . MaxValue . toString )
@ -233,7 +235,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// Always set producer key and value serializer to ByteArraySerializer .
// Always set producer key and value serializer to ByteArraySerializer .
producerProps . setProperty ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.ByteArraySerializer" )
producerProps . setProperty ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.ByteArraySerializer" )
producerProps . setProperty ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.ByteArraySerializer" )
producerProps . setProperty ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.ByteArraySerializer" )
producer = new MirrorMakerProducer ( producerProps )
producer = new MirrorMakerProducer ( sync , producerProps )
// Create consumers
// Create consumers
val mirrorMakerConsumers = if ( useOldConsumer ) {
val mirrorMakerConsumers = if ( useOldConsumer ) {
@ -696,9 +698,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
}
}
private [ tools ] class MirrorMakerProducer ( val producerProps : Properties ) {
private [ tools ] class MirrorMakerProducer ( val sync : Boolean , val producerProps : Properties ) {
val sync = producerProps . getProperty ( "producer.type" , "async" ) . equals ( "sync" )
val producer = new KafkaProducer [ Array [ Byte ] , Array [ Byte ] ] ( producerProps )
val producer = new KafkaProducer [ Array [ Byte ] , Array [ Byte ] ] ( producerProps )