|
|
|
@ -28,7 +28,11 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
@@ -28,7 +28,11 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
|
|
|
|
|
var topic:String = null |
|
|
|
|
var serializerClass:String = null |
|
|
|
|
var brokerList:String = null |
|
|
|
|
|
|
|
|
|
var producerType:String = null |
|
|
|
|
var compressionCodec:String = null |
|
|
|
|
var enqueueTimeout:String = null |
|
|
|
|
var queueSize:String = null |
|
|
|
|
|
|
|
|
|
private var producer: Producer[String, String] = null |
|
|
|
|
|
|
|
|
|
def getTopic:String = topic |
|
|
|
@ -40,6 +44,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
@@ -40,6 +44,18 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
|
|
|
|
|
def getSerializerClass:String = serializerClass |
|
|
|
|
def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } |
|
|
|
|
|
|
|
|
|
def getProducerType:String = producerType |
|
|
|
|
def setProducerType(producerType:String) { this.producerType = producerType } |
|
|
|
|
|
|
|
|
|
def getCompressionCodec:String = compressionCodec |
|
|
|
|
def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } |
|
|
|
|
|
|
|
|
|
def getEnqueueTimeout:String = enqueueTimeout |
|
|
|
|
def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } |
|
|
|
|
|
|
|
|
|
def getQueueSize:String = queueSize |
|
|
|
|
def setQueueSize(queueSize:String) { this.queueSize = queueSize } |
|
|
|
|
|
|
|
|
|
override def activateOptions() { |
|
|
|
|
// check for config parameter validity |
|
|
|
|
val props = new Properties() |
|
|
|
@ -54,6 +70,11 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
@@ -54,6 +70,11 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
|
|
|
|
|
LogLog.debug("Using default encoder - kafka.serializer.StringEncoder") |
|
|
|
|
} |
|
|
|
|
props.put("serializer.class", serializerClass) |
|
|
|
|
//These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified |
|
|
|
|
if(producerType != null) props.put("producer.type", producerType) |
|
|
|
|
if(compressionCodec != null) props.put("compression.codec", compressionCodec) |
|
|
|
|
if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout) |
|
|
|
|
if(queueSize != null) props.put("queue.size", queueSize) |
|
|
|
|
val config : ProducerConfig = new ProducerConfig(props) |
|
|
|
|
producer = new Producer[String, String](config) |
|
|
|
|
LogLog.debug("Kafka producer connected to " + config.brokerList) |
|
|
|
|