Browse Source

Producer performance tool should use the new blocking async producer instead of the sleep timeout hack; KAFKA-118; patched by nehanarkhede; reviewed by junrao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1160947 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Neha Narkhede 13 years ago
parent
commit
4cdc982e22
  1. 12
      core/src/main/scala/kafka/tools/ProducerPerformance.scala
  2. 2
      system_test/embedded_consumer/bin/run-test.sh
  3. 2
      system_test/producer_perf/bin/run-compression-test.sh
  4. 2
      system_test/producer_perf/bin/run-test.sh

12
core/src/main/scala/kafka/tools/ProducerPerformance.scala

@ -83,11 +83,6 @@ object ProducerPerformance { @@ -83,11 +83,6 @@ object ProducerPerformance {
.defaultsTo(100)
val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(0)
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
.withRequiredArg
.describedAs("size")
@ -122,7 +117,6 @@ object ProducerPerformance { @@ -122,7 +117,6 @@ object ProducerPerformance {
val messageSize = options.valueOf(messageSizeOpt).intValue
val isFixSize = !options.has(varyMessageSizeOpt)
val isAsync = options.has(asyncOpt)
val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
var batchSize = options.valueOf(batchSizeOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)
@ -156,7 +150,7 @@ object ProducerPerformance { @@ -156,7 +150,7 @@ object ProducerPerformance {
props.put("batch.size", config.batchSize.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
props.put("queue.enqueueTimeout.ms", "-1")
logger.info("Producer properties = " + props.toString)
val producerConfig = new ProducerConfig(props)
@ -183,8 +177,6 @@ object ProducerPerformance { @@ -183,8 +177,6 @@ object ProducerPerformance {
bytesSent += config.messageSize
try {
producer.send(new ProducerData[String,String](config.topic, message))
if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
nSends += 1
}catch {
case e: Exception => e.printStackTrace
@ -253,8 +245,6 @@ object ProducerPerformance { @@ -253,8 +245,6 @@ object ProducerPerformance {
bytesSent += config.batchSize*config.messageSize
try {
producer.send(new ProducerData[String,String](config.topic, messageSet))
if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
nSends += 1
}catch {
case e: Exception => e.printStackTrace

2
system_test/embedded_consumer/bin/run-test.sh

@ -111,7 +111,7 @@ shutdown_servers() { @@ -111,7 +111,7 @@ shutdown_servers() {
start_producer() {
topic=$1
info "start producing messages for topic $topic ..."
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1 > $base_dir/producer_performance.log &
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
pid_producer=$!
}

2
system_test/producer_perf/bin/run-compression-test.sh

@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>& @@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&
sleep 4
echo "start producing $num_messages messages ..."
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 --compression-codec 1
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1
echo "wait for data to be persisted"
cur_offset="-1"

2
system_test/producer_perf/bin/run-test.sh

@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>& @@ -14,7 +14,7 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&
sleep 4
echo "start producing $num_messages messages ..."
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async
echo "wait for data to be persisted"
cur_offset="-1"

Loading…
Cancel
Save