From ecd1f19f5aea2d2907d1a5594080062cb5c945f1 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Thu, 17 Jan 2013 21:31:06 -0800 Subject: [PATCH 1/3] KAFKA-709 Default queue.enqueue.timeout.ms to -1; reviewed by Jay Kreps --- .../main/scala/kafka/producer/async/AsyncProducerConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index 973fa08c9af..dd39de57b03 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -33,7 +33,7 @@ trait AsyncProducerConfig { * -ve: enqueue will block indefinitely if the queue is full * +ve: enqueue will block up to this many milliseconds if the queue is full */ - val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0) + val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", -1) /** the number of messages batched at the producer */ val batchNumMessages = props.getInt("batch.num.messages", 200) From 1e011bf42efce996a553d6c0a2d7e51b45622b6b Mon Sep 17 00:00:00 2001 From: Joe Stein Date: Sun, 20 Jan 2013 22:37:56 -0500 Subject: [PATCH 2/3] KAFKA-709 change in default blocking tripped test checking queue is full so updated test in case someone changes the new default to something else --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index beb63a4de95..fb0666fd0b8 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -65,6 +65,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("producer.type", "async") props.put("queue.buffering.max.messages", "10") props.put("batch.num.messages", "1") + props.put("queue.enqueue.timeout.ms", "0") val config = new ProducerConfig(props) val produceData = getProduceData(12) From 96dc298438994fb5c93daa8c544bf8581a0eeb69 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sun, 20 Jan 2013 20:52:54 -0800 Subject: [PATCH 3/3] Some arguments are always set to default in ProducerPerformance; patched by John Fung; committed by Jun Rao; kafka-710 --- perf/src/main/scala/kafka/perf/PerfConfig.scala | 2 +- .../scala/kafka/perf/ProducerPerformance.scala | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index 5dabeb51edd..a8fc6b9ec81 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { .defaultsTo(200) val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg - .describedAs("compression codec ") + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 4822a7ecd60..507743e6d24 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -101,11 +101,6 @@ object ProducerPerformance extends Logging { .describedAs("number of threads") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + "in the form of 'Message:000...1:xxx...'") @@ -117,15 +112,6 @@ object ProducerPerformance extends Logging { .describedAs("message send time gap") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) - val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + "set, the csv metrics will be outputed here")