Browse Source

ConsoleProducer does not exit correctly; kafka-701; patched by Maxime Brugidou; reviewed by Jun Rao

0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
53818bb7ee
  1. 2
      config/consumer.properties
  2. 2
      config/server.properties
  3. 34
      core/src/main/scala/kafka/producer/ConsoleProducer.scala

2
config/consumer.properties

@ -20,7 +20,7 @@ @@ -20,7 +20,7 @@
zk.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
#consumer group id
group.id=test-consumer-group

2
config/server.properties

@ -105,7 +105,7 @@ log.cleanup.interval.mins=1 @@ -105,7 +105,7 @@ log.cleanup.interval.mins=1
zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.connection.timeout.ms=1000000
# metrics reporter properties
kafka.metrics.polling.interval.secs=5

34
core/src/main/scala/kafka/producer/ConsoleProducer.scala

@ -128,7 +128,7 @@ object ConsoleProducer { @@ -128,7 +128,7 @@ object ConsoleProducer {
props.put("batch.num.messages", batchSize.toString)
props.put("queue.buffering.max.ms", sendTimeout.toString)
props.put("queue.buffering.max.messages", queueSize.toString)
props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
props.put("request.required.acks", requestRequiredAcks.toString)
props.put("request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
@ -137,20 +137,26 @@ object ConsoleProducer { @@ -137,20 +137,26 @@ object ConsoleProducer {
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
reader.init(System.in, cmdLineProps)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
try {
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
var message: KeyedMessage[AnyRef, AnyRef] = null
do {
message = reader.readMessage()
if(message != null)
producer.send(message)
} while(message != null)
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
var message: KeyedMessage[AnyRef, AnyRef] = null
do {
message = reader.readMessage()
if(message != null)
producer.send(message)
} while(message != null)
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
System.exit(0)
}

Loading…
Cancel
Save