diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 668635daa78..4cb244550ed 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -608,5 +608,8 @@ private[kafka] class Log(val dir: File, def getLastFlushedTime():Long = { return lastflushedTime.get } + + override def toString() = "Log(" + this.dir + ")" + } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 11c7c3b4c4f..4417cffd05d 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -82,7 +82,9 @@ class LogSegment(val messageSet: FileMessageSet, * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. */ def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { - if(maxSize <= 0) + if(maxSize < 0) + throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) + if(maxSize == 0) return MessageSet.Empty val startPosition = translateOffset(startOffset) @@ -99,6 +101,8 @@ class LogSegment(val messageSet: FileMessageSet, maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size + if(offset < startOffset) + throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6a5b4de7c34..c51095f9c0d 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -159,4 +159,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("replica.fetchers", 1) + /* the frequency with which the highwater mark is saved out to disk */ + val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L) + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d28dcffdf37..0cdc93974b4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig, def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) + kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs) } /** diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 4a70e8173ea..5a222b816a1 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int) extends Logging { } private val threadNamesAndIds = new HashMap[String, AtomicInteger]() - def startup = { + def startup() = { executor = new ScheduledThreadPoolExecutor(numThreads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index 78ec1048555..5be4f4e7c57 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -63,7 +63,7 @@ object TestEndToEndLatency { println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed } - println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms" + println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms") producer.close() connector.shutdown() System.exit(0) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 002fc6db446..36d52e75f05 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -20,16 +20,22 @@ package kafka import java.io._ import java.nio._ import java.nio.channels._ +import scala.math._ import joptsimple._ object TestLinearWriteSpeed { def main(args: Array[String]): Unit = { val parser = new OptionParser + val dirOpt = parser.accepts("dir", "The directory to write to.") + .withRequiredArg + .describedAs("path") + .ofType(classOf[java.lang.String]) + .defaultsTo(System.getProperty("java.io.tmpdir")) val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.") .withRequiredArg .describedAs("num_bytes") - .ofType(classOf[java.lang.Integer]) + .ofType(classOf[java.lang.Long]) val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.") .withRequiredArg .describedAs("num_bytes") @@ -39,7 +45,18 @@ object TestLinearWriteSpeed { .describedAs("num_files") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - + val reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(1000) + val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.") + .withRequiredArg + .describedAs("mb") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) + val mmapOpt = parser.accepts("mmap", "Mmap file.") + val options = parser.parse(args : _*) for(arg <- List(bytesOpt, sizeOpt, filesOpt)) { @@ -50,27 +67,84 @@ object TestLinearWriteSpeed { } } - val bytesToWrite = options.valueOf(bytesOpt).intValue + var bytesToWrite = options.valueOf(bytesOpt).longValue + val mmap = options.has(mmapOpt) val bufferSize = options.valueOf(sizeOpt).intValue val numFiles = options.valueOf(filesOpt).intValue + val reportingInterval = options.valueOf(reportingIntervalOpt).longValue + val dir = options.valueOf(dirOpt) + val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L val buffer = ByteBuffer.allocate(bufferSize) while(buffer.hasRemaining) buffer.put(123.asInstanceOf[Byte]) - val channels = new Array[FileChannel](numFiles) + val writables = new Array[Writable](numFiles) for(i <- 0 until numFiles) { - val file = File.createTempFile("kafka-test", ".dat") + val file = new File(dir, "kafka-test-" + i + ".dat") file.deleteOnExit() - channels(i) = new RandomAccessFile(file, "rw").getChannel() + val raf = new RandomAccessFile(file, "rw") + raf.setLength(bytesToWrite / numFiles) + if(mmap) + writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length())) + else + writables(i) = new ChannelWritable(raf.getChannel()) } + bytesToWrite = (bytesToWrite / numFiles) * numFiles - val begin = System.currentTimeMillis - for(i <- 0 until bytesToWrite / bufferSize) { + println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency")) + + val beginTest = System.nanoTime + var maxLatency = 0L + var totalLatency = 0L + var count = 0L + var written = 0L + var totalWritten = 0L + var lastReport = beginTest + while(totalWritten + bufferSize < bytesToWrite) { buffer.rewind() - channels(i % numFiles).write(buffer) + val start = System.nanoTime + writables((count % numFiles).toInt.abs).write(buffer) + val ellapsed = System.nanoTime - start + maxLatency = max(ellapsed, maxLatency) + totalLatency += ellapsed + written += bufferSize + count += 1 + totalWritten += bufferSize + if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) { + val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0) + val mb = written / (1024.0*1024.0) + println("%10.3f\t%10.3f\t%10.3f".format(mb / ellapsedSecs, totalLatency / count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0))) + lastReport = start + written = 0 + maxLatency = 0L + totalLatency = 0L + } else if(written > maxThroughputBytes) { + // if we have written enough, just sit out this reporting interval + val lastReportMs = lastReport / (1000*1000) + val now = System.nanoTime / (1000*1000) + val sleepMs = lastReportMs + reportingInterval - now + if(sleepMs > 0) + Thread.sleep(sleepMs) + } + } + val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0) + println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec") + } + + trait Writable { + def write(buffer: ByteBuffer) + } + + class MmapWritable(val buffer: ByteBuffer) extends Writable { + def write(b: ByteBuffer) { + buffer.put(b) + } + } + + class ChannelWritable(val channel: FileChannel) extends Writable { + def write(b: ByteBuffer) { + channel.write(b) } - val elapsedSecs = (System.currentTimeMillis - begin) / 1000.0 - System.out.println(bytesToWrite / (1024 * 1024 * elapsedSecs) + " MB per sec") } } diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 428fabf0c83..72902ba8b50 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -42,9 +42,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) - val config = new KafkaConfig(props) { - override val flushInterval = 1 - } + val config = new KafkaConfig(props) val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 58961ad3ace..1b127065e38 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetada class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[Broker] = null override def setUp() { diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 213926693f1..19f4c3b2421 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -37,7 +37,7 @@ import kafka.utils._ class AsyncProducerTest extends JUnit3Suite { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 75fb9c77b5f..f30b09769a0 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -28,9 +28,7 @@ import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} class HighwatermarkPersistenceTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val defaultFlushIntervalMs = 100 - }) + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) val topic = "foo" val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime)) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 2b6cab2a272..ad2158c6a97 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -32,7 +32,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 }) val topic = "new-topic" diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 4f76a54c7a6..d0e35909f05 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -28,7 +28,7 @@ import junit.framework.Assert._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" diff --git a/examples/src/main/java/kafka/examples/ExampleUtils.java b/examples/src/main/java/kafka/examples/ExampleUtils.java deleted file mode 100644 index 34fd1c00ea1..00000000000 --- a/examples/src/main/java/kafka/examples/ExampleUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.examples; - - -import java.nio.ByteBuffer; -import kafka.message.Message; - -public class ExampleUtils -{ - public static String getMessage(Message message) - { - ByteBuffer buffer = message.payload(); - byte [] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - return new String(bytes); - } -} diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index 952df9786e8..5dabeb51edd 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -41,4 +41,20 @@ class PerfConfig(args: Array[String]) { val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + "interval as configured by reporting-interval") val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("compression codec ") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 2a6037d5044..a2702894555 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -94,18 +94,8 @@ object ProducerPerformance extends Logging { .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(-1) - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") - .withRequiredArg - .describedAs("batch size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") .withRequiredArg .describedAs("number of threads") @@ -127,6 +117,20 @@ object ProducerPerformance extends Logging { .describedAs("message send time gap") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) + val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) + val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + "set, the csv metrics will be outputed here") @@ -154,10 +158,10 @@ object ProducerPerformance extends Logging { var isSync = options.has(syncOpt) var batchSize = options.valueOf(batchSizeOpt).intValue var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val seqIdMode = options.has(initialMessageIdOpt) var initialMessageId: Int = 0 - if (seqIdMode) + if(seqIdMode) initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()