diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 689547a3f9a..9abf52bff1f 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -28,7 +28,11 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var topic:String = null var serializerClass:String = null var brokerList:String = null - + var producerType:String = null + var compressionCodec:String = null + var enqueueTimeout:String = null + var queueSize:String = null + private var producer: Producer[String, String] = null def getTopic:String = topic @@ -40,6 +44,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } + def getProducerType:String = producerType + def setProducerType(producerType:String) { this.producerType = producerType } + + def getCompressionCodec:String = compressionCodec + def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } + + def getEnqueueTimeout:String = enqueueTimeout + def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } + + def getQueueSize:String = queueSize + def setQueueSize(queueSize:String) { this.queueSize = queueSize } + override def activateOptions() { // check for config parameter validity val props = new Properties() @@ -54,6 +70,11 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { LogLog.debug("Using default encoder - kafka.serializer.StringEncoder") } props.put("serializer.class", serializerClass) + //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified + if(producerType != null) props.put("producer.type", producerType) + if(compressionCodec != null) props.put("compression.codec", compressionCodec) + if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout) + if(queueSize != null) props.put("queue.size", queueSize) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList)