Browse Source

Merge remote branch 'origin/0.8' into trunk

pull/2/head
Jun Rao 12 years ago
parent
commit
999813821c
  1. 2
      core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
  2. 1
      core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  3. 2
      perf/src/main/scala/kafka/perf/PerfConfig.scala
  4. 14
      perf/src/main/scala/kafka/perf/ProducerPerformance.scala

2
core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala

@ -33,7 +33,7 @@ trait AsyncProducerConfig { @@ -33,7 +33,7 @@ trait AsyncProducerConfig {
* -ve: enqueue will block indefinitely if the queue is full
* +ve: enqueue will block up to this many milliseconds if the queue is full
*/
val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0)
val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", -1)
/** the number of messages batched at the producer */
val batchNumMessages = props.getInt("batch.num.messages", 200)

1
core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

@ -65,6 +65,7 @@ class AsyncProducerTest extends JUnit3Suite { @@ -65,6 +65,7 @@ class AsyncProducerTest extends JUnit3Suite {
props.put("producer.type", "async")
props.put("queue.buffering.max.messages", "10")
props.put("batch.num.messages", "1")
props.put("queue.enqueue.timeout.ms", "0")
val config = new ProducerConfig(props)
val produceData = getProduceData(12)

2
perf/src/main/scala/kafka/perf/PerfConfig.scala

@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
.defaultsTo(200)
val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg
.describedAs("compression codec ")
.describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val helpOpt = parser.accepts("help", "Print usage.")

14
perf/src/main/scala/kafka/perf/ProducerPerformance.scala

@ -101,11 +101,6 @@ object ProducerPerformance extends Logging { @@ -101,11 +101,6 @@ object ProducerPerformance extends Logging {
.describedAs("number of threads")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg
.describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
"ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
"in the form of 'Message:000...1:xxx...'")
@ -117,15 +112,6 @@ object ProducerPerformance extends Logging { @@ -117,15 +112,6 @@ object ProducerPerformance extends Logging {
.describedAs("message send time gap")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(3000)
val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
"to complete")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")

Loading…
Cancel
Save