Browse Source

KAFKA-545 Add some log performance tests.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1410088 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Edward Jay Kreps 12 years ago
parent
commit
60d0587e8f
  1. 3
      core/src/main/scala/kafka/log/Log.scala
  2. 6
      core/src/main/scala/kafka/log/LogSegment.scala
  3. 3
      core/src/main/scala/kafka/server/KafkaConfig.scala
  4. 2
      core/src/main/scala/kafka/server/ReplicaManager.scala
  5. 2
      core/src/main/scala/kafka/utils/KafkaScheduler.scala
  6. 2
      core/src/test/scala/other/kafka/TestEndToEndLatency.scala
  7. 96
      core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
  8. 4
      core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  9. 2
      core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
  10. 2
      core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  11. 4
      core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
  12. 1
      core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
  13. 2
      core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
  14. 32
      examples/src/main/java/kafka/examples/ExampleUtils.java
  15. 16
      perf/src/main/scala/kafka/perf/PerfConfig.scala
  16. 28
      perf/src/main/scala/kafka/perf/ProducerPerformance.scala

3
core/src/main/scala/kafka/log/Log.scala

@ -608,5 +608,8 @@ private[kafka] class Log(val dir: File, @@ -608,5 +608,8 @@ private[kafka] class Log(val dir: File,
def getLastFlushedTime():Long = {
return lastflushedTime.get
}
override def toString() = "Log(" + this.dir + ")"
}

6
core/src/main/scala/kafka/log/LogSegment.scala

@ -82,7 +82,9 @@ class LogSegment(val messageSet: FileMessageSet, @@ -82,7 +82,9 @@ class LogSegment(val messageSet: FileMessageSet,
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*/
def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = {
if(maxSize <= 0)
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
if(maxSize == 0)
return MessageSet.Empty
val startPosition = translateOffset(startOffset)
@ -99,6 +101,8 @@ class LogSegment(val messageSet: FileMessageSet, @@ -99,6 +101,8 @@ class LogSegment(val messageSet: FileMessageSet,
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset)
val endPosition =
if(mapping == null)

3
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -159,4 +159,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -159,4 +159,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
val numReplicaFetchers = props.getInt("replica.fetchers", 1)
/* the frequency with which the highwater mark is saved out to disk */
val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
}

2
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig,
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
}
/**

2
core/src/main/scala/kafka/utils/KafkaScheduler.scala

@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int) extends Logging { @@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int) extends Logging {
}
private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
def startup = {
def startup() = {
executor = new ScheduledThreadPoolExecutor(numThreads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

2
core/src/test/scala/other/kafka/TestEndToEndLatency.scala

@ -63,7 +63,7 @@ object TestEndToEndLatency { @@ -63,7 +63,7 @@ object TestEndToEndLatency {
println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
}
println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms"
println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
producer.close()
connector.shutdown()
System.exit(0)

96
core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala

@ -20,16 +20,22 @@ package kafka @@ -20,16 +20,22 @@ package kafka
import java.io._
import java.nio._
import java.nio.channels._
import scala.math._
import joptsimple._
object TestLinearWriteSpeed {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
val dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg
.describedAs("path")
.ofType(classOf[java.lang.String])
.defaultsTo(System.getProperty("java.io.tmpdir"))
val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.")
.withRequiredArg
.describedAs("num_bytes")
.ofType(classOf[java.lang.Integer])
.ofType(classOf[java.lang.Long])
val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg
.describedAs("num_bytes")
@ -39,7 +45,18 @@ object TestLinearWriteSpeed { @@ -39,7 +45,18 @@ object TestLinearWriteSpeed {
.describedAs("num_files")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(1000)
val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg
.describedAs("mb")
.ofType(classOf[java.lang.Integer])
.defaultsTo(Integer.MAX_VALUE)
val mmapOpt = parser.accepts("mmap", "Mmap file.")
val options = parser.parse(args : _*)
for(arg <- List(bytesOpt, sizeOpt, filesOpt)) {
@ -50,27 +67,84 @@ object TestLinearWriteSpeed { @@ -50,27 +67,84 @@ object TestLinearWriteSpeed {
}
}
val bytesToWrite = options.valueOf(bytesOpt).intValue
var bytesToWrite = options.valueOf(bytesOpt).longValue
val mmap = options.has(mmapOpt)
val bufferSize = options.valueOf(sizeOpt).intValue
val numFiles = options.valueOf(filesOpt).intValue
val reportingInterval = options.valueOf(reportingIntervalOpt).longValue
val dir = options.valueOf(dirOpt)
val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L
val buffer = ByteBuffer.allocate(bufferSize)
while(buffer.hasRemaining)
buffer.put(123.asInstanceOf[Byte])
val channels = new Array[FileChannel](numFiles)
val writables = new Array[Writable](numFiles)
for(i <- 0 until numFiles) {
val file = File.createTempFile("kafka-test", ".dat")
val file = new File(dir, "kafka-test-" + i + ".dat")
file.deleteOnExit()
channels(i) = new RandomAccessFile(file, "rw").getChannel()
val raf = new RandomAccessFile(file, "rw")
raf.setLength(bytesToWrite / numFiles)
if(mmap)
writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length()))
else
writables(i) = new ChannelWritable(raf.getChannel())
}
bytesToWrite = (bytesToWrite / numFiles) * numFiles
val begin = System.currentTimeMillis
for(i <- 0 until bytesToWrite / bufferSize) {
println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency"))
val beginTest = System.nanoTime
var maxLatency = 0L
var totalLatency = 0L
var count = 0L
var written = 0L
var totalWritten = 0L
var lastReport = beginTest
while(totalWritten + bufferSize < bytesToWrite) {
buffer.rewind()
channels(i % numFiles).write(buffer)
val start = System.nanoTime
writables((count % numFiles).toInt.abs).write(buffer)
val ellapsed = System.nanoTime - start
maxLatency = max(ellapsed, maxLatency)
totalLatency += ellapsed
written += bufferSize
count += 1
totalWritten += bufferSize
if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) {
val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0)
val mb = written / (1024.0*1024.0)
println("%10.3f\t%10.3f\t%10.3f".format(mb / ellapsedSecs, totalLatency / count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0)))
lastReport = start
written = 0
maxLatency = 0L
totalLatency = 0L
} else if(written > maxThroughputBytes) {
// if we have written enough, just sit out this reporting interval
val lastReportMs = lastReport / (1000*1000)
val now = System.nanoTime / (1000*1000)
val sleepMs = lastReportMs + reportingInterval - now
if(sleepMs > 0)
Thread.sleep(sleepMs)
}
}
val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0)
println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec")
}
trait Writable {
def write(buffer: ByteBuffer)
}
class MmapWritable(val buffer: ByteBuffer) extends Writable {
def write(b: ByteBuffer) {
buffer.put(b)
}
}
class ChannelWritable(val channel: FileChannel) extends Writable {
def write(b: ByteBuffer) {
channel.write(b)
}
val elapsedSecs = (System.currentTimeMillis - begin) / 1000.0
System.out.println(bytesToWrite / (1024 * 1024 * elapsedSecs) + " MB per sec")
}
}

4
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala

@ -42,9 +42,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with @@ -42,9 +42,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val port = TestUtils.choosePort
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props) {
override val flushInterval = 1
}
val config = new KafkaConfig(props)
val configs = List(config)
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])

2
core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala

@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetada @@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetada
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
val configs = props.map(p => new KafkaConfig(p))
var brokers: Seq[Broker] = null
override def setUp() {

2
core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

@ -37,7 +37,7 @@ import kafka.utils._ @@ -37,7 +37,7 @@ import kafka.utils._
class AsyncProducerTest extends JUnit3Suite {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
val configs = props.map(p => new KafkaConfig(p))
override def setUp() {
super.setUp()

4
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala

@ -28,9 +28,7 @@ import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} @@ -28,9 +28,7 @@ import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val defaultFlushIntervalMs = 100
})
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
val topic = "foo"
val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime))

1
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala

@ -32,7 +32,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -32,7 +32,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 5000L
override val replicaMaxLagBytes = 10L
override val flushInterval = 10
override val replicaMinBytes = 20
})
val topic = "new-topic"

2
core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala

@ -28,7 +28,7 @@ import junit.framework.Assert._ @@ -28,7 +28,7 @@ import junit.framework.Assert._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(2)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
val configs = props.map(p => new KafkaConfig(p))
var brokers: Seq[KafkaServer] = null
val topic1 = "foo"
val topic2 = "bar"

32
examples/src/main/java/kafka/examples/ExampleUtils.java

@ -1,32 +0,0 @@ @@ -1,32 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.examples;
import java.nio.ByteBuffer;
import kafka.message.Message;
public class ExampleUtils
{
public static String getMessage(Message message)
{
ByteBuffer buffer = message.payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}

16
perf/src/main/scala/kafka/perf/PerfConfig.scala

@ -41,4 +41,20 @@ class PerfConfig(args: Array[String]) { @@ -41,4 +41,20 @@ class PerfConfig(args: Array[String]) {
val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
"interval as configured by reporting-interval")
val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ")
val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(100)
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(200)
val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg
.describedAs("compression codec ")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val helpOpt = parser.accepts("help", "Print usage.")
}

28
perf/src/main/scala/kafka/perf/ProducerPerformance.scala

@ -94,18 +94,8 @@ object ProducerPerformance extends Logging { @@ -94,18 +94,8 @@ object ProducerPerformance extends Logging {
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(100)
val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
.withRequiredArg
.describedAs("batch size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(200)
val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
.withRequiredArg
.describedAs("number of threads")
@ -127,6 +117,20 @@ object ProducerPerformance extends Logging { @@ -127,6 +117,20 @@ object ProducerPerformance extends Logging {
.describedAs("message send time gap")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(3000)
val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
"to complete")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")
@ -154,10 +158,10 @@ object ProducerPerformance extends Logging { @@ -154,10 +158,10 @@ object ProducerPerformance extends Logging {
var isSync = options.has(syncOpt)
var batchSize = options.valueOf(batchSizeOpt).intValue
var numThreads = options.valueOf(numThreadsOpt).intValue
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
val seqIdMode = options.has(initialMessageIdOpt)
var initialMessageId: Int = 0
if (seqIdMode)
if(seqIdMode)
initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()

Loading…
Cancel
Save