diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index 973fa08c9af..dd39de57b03 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -33,7 +33,7 @@ trait AsyncProducerConfig { * -ve: enqueue will block indefinitely if the queue is full * +ve: enqueue will block up to this many milliseconds if the queue is full */ - val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0) + val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", -1) /** the number of messages batched at the producer */ val batchNumMessages = props.getInt("batch.num.messages", 200) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 37a4c170891..93a86e05d23 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -65,6 +65,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("producer.type", "async") props.put("queue.buffering.max.messages", "10") props.put("batch.num.messages", "1") + props.put("queue.enqueue.timeout.ms", "0") val config = new ProducerConfig(props) val produceData = getProduceData(12) diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index 5dabeb51edd..a8fc6b9ec81 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { .defaultsTo(200) val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg - .describedAs("compression codec ") + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 4822a7ecd60..507743e6d24 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -101,11 +101,6 @@ object ProducerPerformance extends Logging { .describedAs("number of threads") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + "in the form of 'Message:000...1:xxx...'") @@ -117,15 +112,6 @@ object ProducerPerformance extends Logging { .describedAs("message send time gap") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) - val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + "set, the csv metrics will be outputed here")