diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh index 77cda721d6c..4eebe87a5fb 100755 --- a/bin/kafka-consumer-perf-test.sh +++ b/bin/kafka-consumer-perf-test.sh @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConsumerPerformance "$@" diff --git a/bin/windows/kafka-consumer-perf-test.bat b/bin/windows/kafka-consumer-perf-test.bat index 606c784605a..17e17d3b105 100644 --- a/bin/windows/kafka-consumer-perf-test.bat +++ b/bin/windows/kafka-consumer-perf-test.bat @@ -16,5 +16,5 @@ rem limitations under the License. SetLocal set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M -"%~dp0kafka-run-class.bat" kafka.tools.ConsumerPerformance %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConsumerPerformance %* EndLocal diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala deleted file mode 100644 index 56f49456705..00000000000 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.text.SimpleDateFormat -import java.time.Duration -import java.util -import java.util.concurrent.atomic.AtomicLong -import java.util.{Properties, Random} -import com.typesafe.scalalogging.LazyLogging -import joptsimple.OptionException -import kafka.utils.ToolsUtils -import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{Metric, MetricName, TopicPartition} -import org.apache.kafka.server.util.CommandLineUtils - -import scala.jdk.CollectionConverters._ -import scala.collection.mutable - -/** - * Performance test for the full zookeeper consumer - */ -object ConsumerPerformance extends LazyLogging { - - def main(args: Array[String]): Unit = { - - val config = new ConsumerPerfConfig(args) - logger.info("Starting consumer...") - val totalMessagesRead = new AtomicLong(0) - val totalBytesRead = new AtomicLong(0) - var metrics: mutable.Map[MetricName, _ <: Metric] = null - val joinGroupTimeInMs = new AtomicLong(0) - - if (!config.hideHeader) - printHeader(config.showDetailedStats) - - var startMs, endMs = 0L - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props) - startMs = System.currentTimeMillis - consume(consumer, List(config.topic), config.numMessages, config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs) - endMs = System.currentTimeMillis - - if (config.printMetrics) { - metrics = consumer.metrics.asScala - } - consumer.close() - val elapsedSecs = (endMs - startMs) / 1000.0 - val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get - if (!config.showDetailedStats) { - val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) - println("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f".format( - config.dateFormat.format(startMs), - config.dateFormat.format(endMs), - totalMBRead, - totalMBRead / elapsedSecs, - totalMessagesRead.get, - totalMessagesRead.get / elapsedSecs, - joinGroupTimeInMs.get, - fetchTimeInMs, - totalMBRead / (fetchTimeInMs / 1000.0), - totalMessagesRead.get / (fetchTimeInMs / 1000.0) - )) - } - - if (metrics != null) { - ToolsUtils.printMetrics(metrics) - } - - } - - private[tools] def printHeader(showDetailedStats: Boolean): Unit = { - val newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec" - if (!showDetailedStats) - println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader) - else - println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader) - } - - def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], - topics: List[String], - count: Long, - timeout: Long, - config: ConsumerPerfConfig, - totalMessagesRead: AtomicLong, - totalBytesRead: AtomicLong, - joinTime: AtomicLong, - testStartTime: Long): Unit = { - var bytesRead = 0L - var messagesRead = 0L - var lastBytesRead = 0L - var lastMessagesRead = 0L - var joinStart = System.currentTimeMillis - var joinTimeMsInSingleRound = 0L - - consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { - def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - joinTime.addAndGet(System.currentTimeMillis - joinStart) - joinTimeMsInSingleRound += System.currentTimeMillis - joinStart - } - def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { - joinStart = System.currentTimeMillis - }}) - - // Now start the benchmark - var currentTimeMillis = System.currentTimeMillis - var lastReportTime: Long = currentTimeMillis - var lastConsumedTime = currentTimeMillis - - while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) { - val records = consumer.poll(Duration.ofMillis(100)).asScala - currentTimeMillis = System.currentTimeMillis - if (records.nonEmpty) - lastConsumedTime = currentTimeMillis - for (record <- records) { - messagesRead += 1 - if (record.key != null) - bytesRead += record.key.size - if (record.value != null) - bytesRead += record.value.size - - if (currentTimeMillis - lastReportTime >= config.reportingInterval) { - if (config.showDetailedStats) - printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, - lastReportTime, currentTimeMillis, config.dateFormat, joinTimeMsInSingleRound) - joinTimeMsInSingleRound = 0L - lastReportTime = currentTimeMillis - lastMessagesRead = messagesRead - lastBytesRead = bytesRead - } - } - } - - if (messagesRead < count) - println(s"WARNING: Exiting before consuming the expected number of messages: timeout ($timeout ms) exceeded. " + - "You can use the --timeout option to increase the timeout.") - totalMessagesRead.set(messagesRead) - totalBytesRead.set(bytesRead) - } - - def printConsumerProgress(id: Int, - bytesRead: Long, - lastBytesRead: Long, - messagesRead: Long, - lastMessagesRead: Long, - startMs: Long, - endMs: Long, - dateFormat: SimpleDateFormat, - periodicJoinTimeInMs: Long): Unit = { - printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat) - printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, periodicJoinTimeInMs) - println() - } - - private def printBasicProgress(id: Int, - bytesRead: Long, - lastBytesRead: Long, - messagesRead: Long, - lastMessagesRead: Long, - startMs: Long, - endMs: Long, - dateFormat: SimpleDateFormat): Unit = { - val elapsedMs: Double = (endMs - startMs).toDouble - val totalMbRead = (bytesRead * 1.0) / (1024 * 1024) - val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) - val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs - val intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0 - print("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMbRead, - intervalMbPerSec, messagesRead, intervalMessagesPerSec)) - } - - private def printExtendedProgress(bytesRead: Long, - lastBytesRead: Long, - messagesRead: Long, - lastMessagesRead: Long, - startMs: Long, - endMs: Long, - periodicJoinTimeInMs: Long): Unit = { - val fetchTimeMs = endMs - startMs - periodicJoinTimeInMs - val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) - val intervalMessagesRead = messagesRead - lastMessagesRead - val (intervalMbPerSec, intervalMessagesPerSec) = if (fetchTimeMs <= 0) - (0.0, 0.0) - else - (1000.0 * intervalMbRead / fetchTimeMs, 1000.0 * intervalMessagesRead / fetchTimeMs) - print(", %d, %d, %.4f, %.4f".format(periodicJoinTimeInMs, fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec)) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.") - .withRequiredArg - .describedAs("broker-list") - .ofType(classOf[String]) - val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.") - .requiredUnless("broker-list") - .withRequiredArg - .describedAs("server to connect to") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") - .withRequiredArg - .describedAs("gid") - .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) - val numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED: Number of processing threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) - val numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED AND IGNORED: Number of fetcher threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics.") - val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval") - val recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") - .withOptionalArg() - .describedAs("milliseconds") - .ofType(classOf[Long]) - .defaultsTo(10000) - - try - options = parser.parse(args: _*) - catch { - case e: OptionException => - CommandLineUtils.printUsageAndExit(parser, e.getMessage) - } - - if(options.has(numThreadsOpt) || options.has(numFetchersOpt)) - println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test") - - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in performance test for the full zookeeper consumer") - - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) - - val printMetrics = options.has(printMetricsOpt) - - val props = if (options.has(consumerConfigOpt)) - Utils.loadProps(options.valueOf(consumerConfigOpt)) - else - new Properties - - import org.apache.kafka.clients.consumer.ConsumerConfig - - val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt) - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts) - props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) - props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString) - props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString) - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (options.has(resetBeginningOffsetOpt)) "latest" else "earliest") - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) - props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false") - if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client") - - val numThreads = options.valueOf(numThreadsOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - if (reportingInterval <= 0) - throw new IllegalArgumentException("Reporting interval must be greater than 0.") - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue() - } - -} diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala deleted file mode 100644 index 6857e401e80..00000000000 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.tools - -import org.apache.kafka.server.util.CommandDefaultOptions - -class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) { - val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Long]) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.") - .withRequiredArg - .describedAs("interval_ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5000) - val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg - .describedAs("date format") - .ofType(classOf[String]) - .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") - val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") -} diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala deleted file mode 100644 index d872a40bc1d..00000000000 --- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.{ByteArrayOutputStream, PrintWriter} -import java.text.SimpleDateFormat -import kafka.utils.{Exit, TestUtils} -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} -import org.junit.jupiter.api.Test - -class ConsumerPerformanceTest { - private val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS") - - @Test - def testDetailedHeaderMatchBody(): Unit = { - testHeaderMatchContent(detailed = true, 2, - () => ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L)) - } - - @Test - def testNonDetailedHeaderMatchBody(): Unit = { - testHeaderMatchContent(detailed = false, 2, () => println(s"${dateFormat.format(System.currentTimeMillis)}, " + - s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0, 1, 1, 1.1, 1.1")) - } - - @Test - def testConfigBrokerList(): Unit = { - //Given - val args: Array[String] = Array( - "--broker-list", "localhost:9092", - "--topic", "test", - "--messages", "10" - ) - - //When - val config = new ConsumerPerformance.ConsumerPerfConfig(args) - - //Then - assertEquals("localhost:9092", config.brokerHostsAndPorts) - assertEquals("test", config.topic) - assertEquals(10, config.numMessages) - } - - @Test - def testConfigBootStrapServer(): Unit = { - //Given - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--messages", "10", - "--print-metrics" - ) - - //When - val config = new ConsumerPerformance.ConsumerPerfConfig(args) - - //Then - assertEquals("localhost:9092", config.brokerHostsAndPorts) - assertEquals("test", config.topic) - assertEquals(10, config.numMessages) - } - - @Test - def testBrokerListOverride(): Unit = { - //Given - val args: Array[String] = Array( - "--broker-list", "localhost:9094", - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--messages", "10" - ) - - //When - val config = new ConsumerPerformance.ConsumerPerfConfig(args) - - //Then - assertEquals("localhost:9092", config.brokerHostsAndPorts) - assertEquals("test", config.topic) - assertEquals(10, config.numMessages) - } - - @Test - def testConfigWithUnrecognizedOption(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) - //Given - val args: Array[String] = Array( - "--broker-list", "localhost:9092", - "--topic", "test", - "--messages", "10", - "--new-consumer" - ) - try assertThrows(classOf[IllegalArgumentException], () => new ConsumerPerformance.ConsumerPerfConfig(args)) - finally Exit.resetExitProcedure() - } - - @Test - def testClientIdOverride(): Unit = { - val consumerConfigFile = TestUtils.tempFile("test_consumer_config",".conf") - new PrintWriter(consumerConfigFile.getPath) { write("client.id=consumer-1"); close() } - - //Given - val args: Array[String] = Array( - "--broker-list", "localhost:9092", - "--topic", "test", - "--messages", "10", - "--consumer.config", consumerConfigFile.getPath - ) - - //When - val config = new ConsumerPerformance.ConsumerPerfConfig(args) - - //Then - assertEquals("consumer-1", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) - } - - @Test - def testDefaultClientId(): Unit = { - //Given - val args: Array[String] = Array( - "--broker-list", "localhost:9092", - "--topic", "test", - "--messages", "10" - ) - - //When - val config = new ConsumerPerformance.ConsumerPerfConfig(args) - - //Then - assertEquals("perf-consumer-client", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) - } - - private def testHeaderMatchContent(detailed: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = { - val outContent = new ByteArrayOutputStream - try { - Console.withOut(outContent) { - ConsumerPerformance.printHeader(detailed) - fun() - - val contents = outContent.toString.split("\n") - assertEquals(expectedOutputLineCount, contents.length) - val header = contents(0) - val body = contents(1) - - assertEquals(header.split(",\\s").length, body.split(",\\s").length) - } - } finally { - outContent.close() - } - } -} diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 959f23a366c..497569650cc 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -202,7 +202,7 @@ class Benchmark(Test): Return aggregate throughput statistics for both producer and consumer. - (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) + (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.java) """ client_version = KafkaVersion(client_version) broker_version = KafkaVersion(broker_version) diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 6df8dfb6be8..ed7cc99f86a 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -23,7 +23,7 @@ from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0 class ConsumerPerformanceService(PerformanceService): """ - See ConsumerPerformance.scala as the source of truth on these settings, but for reference: + See ConsumerPerformance tool as the source of truth on these settings, but for reference: "zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. This option is only used with the old consumer." diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java new file mode 100644 index 00000000000..56c51f34a84 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import static joptsimple.util.RegexMatcher.regex; + +public class ConsumerPerformance { + private static final Logger LOG = LoggerFactory.getLogger(ConsumerPerformance.class); + private static final Random RND = new Random(); + + public static void main(String[] args) { + try { + LOG.info("Starting consumer..."); + ConsumerPerfOptions options = new ConsumerPerfOptions(args); + AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalBytesRead = new AtomicLong(0); + AtomicLong joinTimeMs = new AtomicLong(0); + AtomicLong joinTimeMsInSingleRound = new AtomicLong(0); + + if (!options.hideHeader()) + printHeader(options.showDetailedStats()); + + KafkaConsumer consumer = new KafkaConsumer<>(options.props()); + long bytesRead = 0L; + long messagesRead = 0L; + long lastBytesRead = 0L; + long lastMessagesRead = 0L; + long currentTimeMs = System.currentTimeMillis(); + long joinStartMs = currentTimeMs; + long startMs = currentTimeMs; + consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, + bytesRead, messagesRead, lastBytesRead, lastMessagesRead, + joinStartMs, joinTimeMsInSingleRound); + long endMs = System.currentTimeMillis(); + + Map metrics = null; + if (options.printMetrics()) + metrics = consumer.metrics(); + consumer.close(); + + // print final stats + double elapsedSec = (endMs - startMs) / 1_000.0; + long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get(); + if (!options.showDetailedStats()) { + double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 1024); + System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n", + options.dateFormat().format(startMs), + options.dateFormat().format(endMs), + totalMbRead, + totalMbRead / elapsedSec, + totalMessagesRead.get(), + totalMessagesRead.get() / elapsedSec, + joinTimeMs.get(), + fetchTimeInMs, + totalMbRead / (fetchTimeInMs / 1000.0), + totalMessagesRead.get() / (fetchTimeInMs / 1000.0) + ); + } + + if (metrics != null) + ToolsUtils.printMetrics(metrics); + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + Exit.exit(1); + } + } + + protected static void printHeader(boolean showDetailedStats) { + String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec"; + if (!showDetailedStats) + System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader); + else + System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader); + } + + private static void consume(KafkaConsumer consumer, + ConsumerPerfOptions options, + AtomicLong totalMessagesRead, + AtomicLong totalBytesRead, + AtomicLong joinTimeMs, + long bytesRead, + long messagesRead, + long lastBytesRead, + long lastMessagesRead, + long joinStartMs, + AtomicLong joinTimeMsInSingleRound) { + long numMessages = options.numMessages(); + long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); + long reportingIntervalMs = options.reportingIntervalMs(); + boolean showDetailedStats = options.showDetailedStats(); + SimpleDateFormat dateFormat = options.dateFormat(); + consumer.subscribe(options.topic(), + new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound)); + + // now start the benchmark + long currentTimeMs = System.currentTimeMillis(); + long lastReportTimeMs = currentTimeMs; + long lastConsumedTimeMs = currentTimeMs; + + while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + currentTimeMs = System.currentTimeMillis(); + if (!records.isEmpty()) + lastConsumedTimeMs = currentTimeMs; + for (ConsumerRecord record : records) { + messagesRead += 1; + if (record.key() != null) + bytesRead += record.key().length; + if (record.value() != null) + bytesRead += record.value().length; + if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) { + if (showDetailedStats) + printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, + lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get()); + joinTimeMsInSingleRound = new AtomicLong(0); + lastReportTimeMs = currentTimeMs; + lastMessagesRead = messagesRead; + lastBytesRead = bytesRead; + } + } + } + + if (messagesRead < numMessages) + System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); + totalMessagesRead.set(messagesRead); + totalBytesRead.set(bytesRead); + } + + protected static void printConsumerProgress(int id, + long bytesRead, + long lastBytesRead, + long messagesRead, + long lastMessagesRead, + long startMs, + long endMs, + SimpleDateFormat dateFormat, + long joinTimeMsInSingleRound) { + printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat); + printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound); + System.out.println(); + } + + private static void printBasicProgress(int id, + long bytesRead, + long lastBytesRead, + long messagesRead, + long lastMessagesRead, + long startMs, + long endMs, + SimpleDateFormat dateFormat) { + double elapsedMs = endMs - startMs; + double totalMbRead = (bytesRead * 1.0) / (1024 * 1024); + double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); + double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs; + double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0; + System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id, + totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec); + } + + private static void printExtendedProgress(long bytesRead, + long lastBytesRead, + long messagesRead, + long lastMessagesRead, + long startMs, + long endMs, + long joinTimeMsInSingleRound) { + long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound; + double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); + long intervalMessagesRead = messagesRead - lastMessagesRead; + double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs; + double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs; + System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound, + fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec); + } + + public static class ConsumerPerfRebListener implements ConsumerRebalanceListener { + private AtomicLong joinTimeMs; + private AtomicLong joinTimeMsInSingleRound; + private long joinStartMs; + + public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) { + this.joinTimeMs = joinTimeMs; + this.joinStartMs = joinStartMs; + this.joinTimeMsInSingleRound = joinTimeMsInSingleRound; + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + joinStartMs = System.currentTimeMillis(); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + long elapsedMs = System.currentTimeMillis() - joinStartMs; + joinTimeMs.addAndGet(elapsedMs); + joinTimeMsInSingleRound.addAndGet(elapsedMs); + } + } + + protected static class ConsumerPerfOptions extends CommandDefaultOptions { + private final OptionSpec brokerListOpt; + private final OptionSpec bootstrapServerOpt; + private final OptionSpec topicOpt; + private final OptionSpec groupIdOpt; + private final OptionSpec fetchSizeOpt; + private final OptionSpec resetBeginningOffsetOpt; + private final OptionSpec socketBufferSizeOpt; + private final OptionSpec numThreadsOpt; + private final OptionSpec numFetchersOpt; + private final OptionSpec consumerConfigOpt; + private final OptionSpec printMetricsOpt; + private final OptionSpec showDetailedStatsOpt; + private final OptionSpec recordFetchTimeoutOpt; + private final OptionSpec numMessagesOpt; + private final OptionSpec reportingIntervalOpt; + private final OptionSpec dateFormatOpt; + private final OptionSpec hideHeaderOpt; + + public ConsumerPerfOptions(String[] args) { + super(args); + brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg() + .describedAs("broker-list") + .ofType(String.class); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.") + .requiredUnless("broker-list") + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg() + .describedAs("gid") + .defaultsTo("perf-consumer-" + RND.nextInt(100_000)) + .ofType(String.class); + fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg() + .describedAs("size") + .ofType(Integer.class) + .defaultsTo(1024 * 1024); + resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message."); + socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg() + .describedAs("size") + .ofType(Integer.class) + .defaultsTo(2 * 1024 * 1024); + numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED: Number of processing threads.") + .withRequiredArg() + .describedAs("count") + .ofType(Integer.class) + .defaultsTo(10); + numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED AND IGNORED: Number of fetcher threads.") + .withRequiredArg() + .describedAs("count") + .ofType(Integer.class) + .defaultsTo(1); + consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); + showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval"); + recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") + .withOptionalArg() + .describedAs("milliseconds") + .ofType(Long.class) + .defaultsTo(10_000L); + numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); + reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.") + .withRequiredArg() + .withValuesConvertedBy(regex("^\\d+$")) + .describedAs("interval_ms") + .ofType(Long.class) + .defaultsTo(5_000L); + dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg() + .describedAs("date format") + .ofType(String.class) + .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + try { + options = parser.parse(args); + } catch (OptionException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + return; + } + if (options != null) { + if (options.has(numThreadsOpt) || options.has(numFetchersOpt)) + System.out.println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test"); + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + } + } + + public boolean printMetrics() { + return options.has(printMetricsOpt); + } + + public String brokerHostsAndPorts() { + return options.valueOf(options.has(bootstrapServerOpt) ? bootstrapServerOpt : brokerListOpt); + } + + public Properties props() throws IOException { + Properties props = (options.has(consumerConfigOpt)) + ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + : new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString()); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + options.has(resetBeginningOffsetOpt) ? "latest" : "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false"); + if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client"); + return props; + } + + public Set topic() { + return Collections.singleton(options.valueOf(topicOpt)); + } + + public long numMessages() { + return options.valueOf(numMessagesOpt); + } + + public long reportingIntervalMs() { + long value = options.valueOf(reportingIntervalOpt); + if (value <= 0) + throw new IllegalArgumentException("Reporting interval must be greater than 0."); + return value; + } + + public boolean showDetailedStats() { + return options.has(showDetailedStatsOpt); + } + + public SimpleDateFormat dateFormat() { + return new SimpleDateFormat(options.valueOf(dateFormatOpt)); + } + + public boolean hideHeader() { + return options.has(hideHeaderOpt); + } + + public long recordFetchTimeoutMs() { + return options.valueOf(recordFetchTimeoutOpt); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java new file mode 100644 index 00000000000..65a533063aa --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Exit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.SimpleDateFormat; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerPerformanceTest { + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); + + @TempDir + static Path tempDir; + + @BeforeEach + public void beforeEach() { + Exit.setExitProcedure(exitProcedure); + } + + @AfterEach + public void afterEach() { + Exit.resetExitProcedure(); + } + + @Test + public void testDetailedHeaderMatchBody() { + testHeaderMatchContent(true, 2, + () -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L)); + } + + @Test + public void testNonDetailedHeaderMatchBody() { + testHeaderMatchContent(false, 2, + () -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L)); + } + + @Test + public void testConfigBrokerList() { + String[] args = new String[]{ + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("localhost:9092", config.brokerHostsAndPorts()); + assertTrue(config.topic().contains("test")); + assertEquals(10, config.numMessages()); + } + + @Test + public void testConfigBootStrapServer() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--print-metrics" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("localhost:9092", config.brokerHostsAndPorts()); + assertTrue(config.topic().contains("test")); + assertEquals(10, config.numMessages()); + } + + @Test + public void testBrokerListOverride() { + String[] args = new String[]{ + "--broker-list", "localhost:9094", + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("localhost:9092", config.brokerHostsAndPorts()); + assertTrue(config.topic().contains("test")); + assertEquals(10, config.numMessages()); + } + + @Test + public void testConfigWithUnrecognizedOption() { + String[] args = new String[]{ + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--new-consumer" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); + + assertTrue(err.contains("new-consumer is not a recognized option")); + } + + @Test + public void testClientIdOverride() throws IOException { + File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--consumer.config", tempFile.getAbsolutePath() + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testDefaultClientId() throws IOException { + String[] args = new String[]{ + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("perf-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + private void testHeaderMatchContent(boolean detailed, int expectedOutputLineCount, Runnable runnable) { + String out = ToolsTestUtils.captureStandardOut(() -> { + ConsumerPerformance.printHeader(detailed); + runnable.run(); + }); + + String[] contents = out.split("\n"); + assertEquals(expectedOutputLineCount, contents.length); + String header = contents[0]; + String body = contents[1]; + + assertEquals(header.split(",\\s").length, body.split(",\\s").length); + } +}