Browse Source

KAFKA-14578: Move ConsumerPerformance to tools (#13215)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
pull/13355/head
Federico Valeri 2 years ago committed by GitHub
parent
commit
07e2f6cd4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-consumer-perf-test.sh
  2. 2
      bin/windows/kafka-consumer-perf-test.bat
  3. 306
      core/src/main/scala/kafka/tools/ConsumerPerformance.scala
  4. 39
      core/src/main/scala/kafka/tools/PerfConfig.scala
  5. 166
      core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
  6. 2
      tests/kafkatest/benchmarks/core/benchmark_test.py
  7. 2
      tests/kafkatest/services/performance/consumer_performance.py
  8. 407
      tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
  9. 172
      tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java

2
bin/kafka-consumer-perf-test.sh

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@" exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConsumerPerformance "$@"

2
bin/windows/kafka-consumer-perf-test.bat

@ -16,5 +16,5 @@ rem limitations under the License.
SetLocal SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
"%~dp0kafka-run-class.bat" kafka.tools.ConsumerPerformance %* "%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConsumerPerformance %*
EndLocal EndLocal

306
core/src/main/scala/kafka/tools/ConsumerPerformance.scala

@ -1,306 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.text.SimpleDateFormat
import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicLong
import java.util.{Properties, Random}
import com.typesafe.scalalogging.LazyLogging
import joptsimple.OptionException
import kafka.utils.ToolsUtils
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import org.apache.kafka.server.util.CommandLineUtils
import scala.jdk.CollectionConverters._
import scala.collection.mutable
/**
* Performance test for the full zookeeper consumer
*/
object ConsumerPerformance extends LazyLogging {
def main(args: Array[String]): Unit = {
val config = new ConsumerPerfConfig(args)
logger.info("Starting consumer...")
val totalMessagesRead = new AtomicLong(0)
val totalBytesRead = new AtomicLong(0)
var metrics: mutable.Map[MetricName, _ <: Metric] = null
val joinGroupTimeInMs = new AtomicLong(0)
if (!config.hideHeader)
printHeader(config.showDetailedStats)
var startMs, endMs = 0L
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
startMs = System.currentTimeMillis
consume(consumer, List(config.topic), config.numMessages, config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs)
endMs = System.currentTimeMillis
if (config.printMetrics) {
metrics = consumer.metrics.asScala
}
consumer.close()
val elapsedSecs = (endMs - startMs) / 1000.0
val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get
if (!config.showDetailedStats) {
val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
println("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f".format(
config.dateFormat.format(startMs),
config.dateFormat.format(endMs),
totalMBRead,
totalMBRead / elapsedSecs,
totalMessagesRead.get,
totalMessagesRead.get / elapsedSecs,
joinGroupTimeInMs.get,
fetchTimeInMs,
totalMBRead / (fetchTimeInMs / 1000.0),
totalMessagesRead.get / (fetchTimeInMs / 1000.0)
))
}
if (metrics != null) {
ToolsUtils.printMetrics(metrics)
}
}
private[tools] def printHeader(showDetailedStats: Boolean): Unit = {
val newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec"
if (!showDetailedStats)
println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
else
println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
}
def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
topics: List[String],
count: Long,
timeout: Long,
config: ConsumerPerfConfig,
totalMessagesRead: AtomicLong,
totalBytesRead: AtomicLong,
joinTime: AtomicLong,
testStartTime: Long): Unit = {
var bytesRead = 0L
var messagesRead = 0L
var lastBytesRead = 0L
var lastMessagesRead = 0L
var joinStart = System.currentTimeMillis
var joinTimeMsInSingleRound = 0L
consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
joinTime.addAndGet(System.currentTimeMillis - joinStart)
joinTimeMsInSingleRound += System.currentTimeMillis - joinStart
}
def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
joinStart = System.currentTimeMillis
}})
// Now start the benchmark
var currentTimeMillis = System.currentTimeMillis
var lastReportTime: Long = currentTimeMillis
var lastConsumedTime = currentTimeMillis
while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
currentTimeMillis = System.currentTimeMillis
if (records.nonEmpty)
lastConsumedTime = currentTimeMillis
for (record <- records) {
messagesRead += 1
if (record.key != null)
bytesRead += record.key.size
if (record.value != null)
bytesRead += record.value.size
if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
if (config.showDetailedStats)
printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTime, currentTimeMillis, config.dateFormat, joinTimeMsInSingleRound)
joinTimeMsInSingleRound = 0L
lastReportTime = currentTimeMillis
lastMessagesRead = messagesRead
lastBytesRead = bytesRead
}
}
}
if (messagesRead < count)
println(s"WARNING: Exiting before consuming the expected number of messages: timeout ($timeout ms) exceeded. " +
"You can use the --timeout option to increase the timeout.")
totalMessagesRead.set(messagesRead)
totalBytesRead.set(bytesRead)
}
def printConsumerProgress(id: Int,
bytesRead: Long,
lastBytesRead: Long,
messagesRead: Long,
lastMessagesRead: Long,
startMs: Long,
endMs: Long,
dateFormat: SimpleDateFormat,
periodicJoinTimeInMs: Long): Unit = {
printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat)
printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, periodicJoinTimeInMs)
println()
}
private def printBasicProgress(id: Int,
bytesRead: Long,
lastBytesRead: Long,
messagesRead: Long,
lastMessagesRead: Long,
startMs: Long,
endMs: Long,
dateFormat: SimpleDateFormat): Unit = {
val elapsedMs: Double = (endMs - startMs).toDouble
val totalMbRead = (bytesRead * 1.0) / (1024 * 1024)
val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs
val intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0
print("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMbRead,
intervalMbPerSec, messagesRead, intervalMessagesPerSec))
}
private def printExtendedProgress(bytesRead: Long,
lastBytesRead: Long,
messagesRead: Long,
lastMessagesRead: Long,
startMs: Long,
endMs: Long,
periodicJoinTimeInMs: Long): Unit = {
val fetchTimeMs = endMs - startMs - periodicJoinTimeInMs
val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
val intervalMessagesRead = messagesRead - lastMessagesRead
val (intervalMbPerSec, intervalMessagesPerSec) = if (fetchTimeMs <= 0)
(0.0, 0.0)
else
(1000.0 * intervalMbRead / fetchTimeMs, 1000.0 * intervalMessagesRead / fetchTimeMs)
print(", %d, %d, %.4f, %.4f".format(periodicJoinTimeInMs, fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec))
}
class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
.describedAs("broker-list")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.")
.requiredUnless("broker-list")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg
.describedAs("gid")
.defaultsTo("perf-consumer-" + new Random().nextInt(100000))
.ofType(classOf[String])
val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1024 * 1024)
val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.")
val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(2 * 1024 * 1024)
val numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED: Number of processing threads.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10)
val numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED AND IGNORED: Number of fetcher threads.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
val printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics.")
val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
"interval as configured by reporting-interval")
val recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.")
.withOptionalArg()
.describedAs("milliseconds")
.ofType(classOf[Long])
.defaultsTo(10000)
try
options = parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndExit(parser, e.getMessage)
}
if(options.has(numThreadsOpt) || options.has(numFetchersOpt))
println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in performance test for the full zookeeper consumer")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)
val printMetrics = options.has(printMetricsOpt)
val props = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
new Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts)
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (options.has(resetBeginningOffsetOpt)) "latest" else "earliest")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client")
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)
val numMessages = options.valueOf(numMessagesOpt).longValue
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
if (reportingInterval <= 0)
throw new IllegalArgumentException("Reporting interval must be greater than 0.")
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue()
}
}

39
core/src/main/scala/kafka/tools/PerfConfig.scala

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import org.apache.kafka.server.util.CommandDefaultOptions
class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Long])
val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.")
.withRequiredArg
.describedAs("interval_ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(5000)
val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
"See java.text.SimpleDateFormat for options.")
.withRequiredArg
.describedAs("date format")
.ofType(classOf[String])
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ")
}

166
core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala

@ -1,166 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.{ByteArrayOutputStream, PrintWriter}
import java.text.SimpleDateFormat
import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
class ConsumerPerformanceTest {
private val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
@Test
def testDetailedHeaderMatchBody(): Unit = {
testHeaderMatchContent(detailed = true, 2,
() => ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L))
}
@Test
def testNonDetailedHeaderMatchBody(): Unit = {
testHeaderMatchContent(detailed = false, 2, () => println(s"${dateFormat.format(System.currentTimeMillis)}, " +
s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0, 1, 1, 1.1, 1.1"))
}
@Test
def testConfigBrokerList(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10"
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("localhost:9092", config.brokerHostsAndPorts)
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
@Test
def testConfigBootStrapServer(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--print-metrics"
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("localhost:9092", config.brokerHostsAndPorts)
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
@Test
def testBrokerListOverride(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9094",
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("localhost:9092", config.brokerHostsAndPorts)
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
@Test
def testConfigWithUnrecognizedOption(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--new-consumer"
)
try assertThrows(classOf[IllegalArgumentException], () => new ConsumerPerformance.ConsumerPerfConfig(args))
finally Exit.resetExitProcedure()
}
@Test
def testClientIdOverride(): Unit = {
val consumerConfigFile = TestUtils.tempFile("test_consumer_config",".conf")
new PrintWriter(consumerConfigFile.getPath) { write("client.id=consumer-1"); close() }
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--consumer.config", consumerConfigFile.getPath
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("consumer-1", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
@Test
def testDefaultClientId(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10"
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("perf-consumer-client", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
private def testHeaderMatchContent(detailed: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = {
val outContent = new ByteArrayOutputStream
try {
Console.withOut(outContent) {
ConsumerPerformance.printHeader(detailed)
fun()
val contents = outContent.toString.split("\n")
assertEquals(expectedOutputLineCount, contents.length)
val header = contents(0)
val body = contents(1)
assertEquals(header.split(",\\s").length, body.split(",\\s").length)
}
} finally {
outContent.close()
}
}
}

2
tests/kafkatest/benchmarks/core/benchmark_test.py

@ -202,7 +202,7 @@ class Benchmark(Test):
Return aggregate throughput statistics for both producer and consumer. Return aggregate throughput statistics for both producer and consumer.
(Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.java)
""" """
client_version = KafkaVersion(client_version) client_version = KafkaVersion(client_version)
broker_version = KafkaVersion(broker_version) broker_version = KafkaVersion(broker_version)

2
tests/kafkatest/services/performance/consumer_performance.py

@ -23,7 +23,7 @@ from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0
class ConsumerPerformanceService(PerformanceService): class ConsumerPerformanceService(PerformanceService):
""" """
See ConsumerPerformance.scala as the source of truth on these settings, but for reference: See ConsumerPerformance tool as the source of truth on these settings, but for reference:
"zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can "zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can
be given to allow fail-over. This option is only used with the old consumer." be given to allow fail-over. This option is only used with the old consumer."

407
tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java

@ -0,0 +1,407 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.ToolsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static joptsimple.util.RegexMatcher.regex;
public class ConsumerPerformance {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerPerformance.class);
private static final Random RND = new Random();
public static void main(String[] args) {
try {
LOG.info("Starting consumer...");
ConsumerPerfOptions options = new ConsumerPerfOptions(args);
AtomicLong totalMessagesRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
AtomicLong joinTimeMs = new AtomicLong(0);
AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
if (!options.hideHeader())
printHeader(options.showDetailedStats());
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(options.props());
long bytesRead = 0L;
long messagesRead = 0L;
long lastBytesRead = 0L;
long lastMessagesRead = 0L;
long currentTimeMs = System.currentTimeMillis();
long joinStartMs = currentTimeMs;
long startMs = currentTimeMs;
consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs,
bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
joinStartMs, joinTimeMsInSingleRound);
long endMs = System.currentTimeMillis();
Map<MetricName, ? extends Metric> metrics = null;
if (options.printMetrics())
metrics = consumer.metrics();
consumer.close();
// print final stats
double elapsedSec = (endMs - startMs) / 1_000.0;
long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
if (!options.showDetailedStats()) {
double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 1024);
System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n",
options.dateFormat().format(startMs),
options.dateFormat().format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
totalMessagesRead.get(),
totalMessagesRead.get() / elapsedSec,
joinTimeMs.get(),
fetchTimeInMs,
totalMbRead / (fetchTimeInMs / 1000.0),
totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
);
}
if (metrics != null)
ToolsUtils.printMetrics(metrics);
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
Exit.exit(1);
}
}
protected static void printHeader(boolean showDetailedStats) {
String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec";
if (!showDetailedStats)
System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
else
System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
}
private static void consume(KafkaConsumer<byte[], byte[]> consumer,
ConsumerPerfOptions options,
AtomicLong totalMessagesRead,
AtomicLong totalBytesRead,
AtomicLong joinTimeMs,
long bytesRead,
long messagesRead,
long lastBytesRead,
long lastMessagesRead,
long joinStartMs,
AtomicLong joinTimeMsInSingleRound) {
long numMessages = options.numMessages();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
SimpleDateFormat dateFormat = options.dateFormat();
consumer.subscribe(options.topic(),
new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));
// now start the benchmark
long currentTimeMs = System.currentTimeMillis();
long lastReportTimeMs = currentTimeMs;
long lastConsumedTimeMs = currentTimeMs;
while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesRead += 1;
if (record.key() != null)
bytesRead += record.key().length;
if (record.value() != null)
bytesRead += record.value().length;
if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
if (showDetailedStats)
printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get());
joinTimeMsInSingleRound = new AtomicLong(0);
lastReportTimeMs = currentTimeMs;
lastMessagesRead = messagesRead;
lastBytesRead = bytesRead;
}
}
}
if (messagesRead < numMessages)
System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
totalMessagesRead.set(messagesRead);
totalBytesRead.set(bytesRead);
}
protected static void printConsumerProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
long joinTimeMsInSingleRound) {
printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
System.out.println();
}
private static void printBasicProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat) {
double elapsedMs = endMs - startMs;
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0;
System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id,
totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec);
}
private static void printExtendedProgress(long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long startMs,
long endMs,
long joinTimeMsInSingleRound) {
long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
long intervalMessagesRead = messagesRead - lastMessagesRead;
double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs;
double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs;
System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
}
public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
private AtomicLong joinTimeMs;
private AtomicLong joinTimeMsInSingleRound;
private long joinStartMs;
public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
this.joinTimeMs = joinTimeMs;
this.joinStartMs = joinStartMs;
this.joinTimeMsInSingleRound = joinTimeMsInSingleRound;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
joinStartMs = System.currentTimeMillis();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long elapsedMs = System.currentTimeMillis() - joinStartMs;
joinTimeMs.addAndGet(elapsedMs);
joinTimeMsInSingleRound.addAndGet(elapsedMs);
}
}
protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<String> brokerListOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
private final OptionSpec<Integer> numThreadsOpt;
private final OptionSpec<Integer> numFetchersOpt;
private final OptionSpec<String> consumerConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
private final OptionSpec<Long> numMessagesOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
public ConsumerPerfOptions(String[] args) {
super(args);
brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.describedAs("broker-list")
.ofType(String.class);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.")
.requiredUnless("broker-list")
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg()
.describedAs("gid")
.defaultsTo("perf-consumer-" + RND.nextInt(100_000))
.ofType(String.class);
fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.");
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED: Number of processing threads.")
.withRequiredArg()
.describedAs("count")
.ofType(Integer.class)
.defaultsTo(10);
numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED AND IGNORED: Number of fetcher threads.")
.withRequiredArg()
.describedAs("count")
.ofType(Integer.class)
.defaultsTo(1);
consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics.");
showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
"interval as configured by reporting-interval");
recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.")
.withOptionalArg()
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.")
.withRequiredArg()
.withValuesConvertedBy(regex("^\\d+$"))
.describedAs("interval_ms")
.ofType(Long.class)
.defaultsTo(5_000L);
dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
"See java.text.SimpleDateFormat for options.")
.withRequiredArg()
.describedAs("date format")
.ofType(String.class)
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats");
try {
options = parser.parse(args);
} catch (OptionException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
return;
}
if (options != null) {
if (options.has(numThreadsOpt) || options.has(numFetchersOpt))
System.out.println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test");
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt);
}
}
public boolean printMetrics() {
return options.has(printMetricsOpt);
}
public String brokerHostsAndPorts() {
return options.valueOf(options.has(bootstrapServerOpt) ? bootstrapServerOpt : brokerListOpt);
}
public Properties props() throws IOException {
Properties props = (options.has(consumerConfigOpt))
? Utils.loadProps(options.valueOf(consumerConfigOpt))
: new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
options.has(resetBeginningOffsetOpt) ? "latest" : "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false");
if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client");
return props;
}
public Set<String> topic() {
return Collections.singleton(options.valueOf(topicOpt));
}
public long numMessages() {
return options.valueOf(numMessagesOpt);
}
public long reportingIntervalMs() {
long value = options.valueOf(reportingIntervalOpt);
if (value <= 0)
throw new IllegalArgumentException("Reporting interval must be greater than 0.");
return value;
}
public boolean showDetailedStats() {
return options.has(showDetailedStatsOpt);
}
public SimpleDateFormat dateFormat() {
return new SimpleDateFormat(options.valueOf(dateFormatOpt));
}
public boolean hideHeader() {
return options.has(hideHeaderOpt);
}
public long recordFetchTimeoutMs() {
return options.valueOf(recordFetchTimeoutOpt);
}
}
}

172
tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsumerPerformanceTest {
private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
@TempDir
static Path tempDir;
@BeforeEach
public void beforeEach() {
Exit.setExitProcedure(exitProcedure);
}
@AfterEach
public void afterEach() {
Exit.resetExitProcedure();
}
@Test
public void testDetailedHeaderMatchBody() {
testHeaderMatchContent(true, 2,
() -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L));
}
@Test
public void testNonDetailedHeaderMatchBody() {
testHeaderMatchContent(false, 2,
() -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1L));
}
@Test
public void testConfigBrokerList() {
String[] args = new String[]{
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
assertEquals(10, config.numMessages());
}
@Test
public void testConfigBootStrapServer() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--print-metrics"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
assertEquals(10, config.numMessages());
}
@Test
public void testBrokerListOverride() {
String[] args = new String[]{
"--broker-list", "localhost:9094",
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
assertEquals(10, config.numMessages());
}
@Test
public void testConfigWithUnrecognizedOption() {
String[] args = new String[]{
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--new-consumer"
};
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains("new-consumer is not a recognized option"));
}
@Test
public void testClientIdOverride() throws IOException {
File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=consumer-1");
output.flush();
}
String[] args = new String[]{
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--consumer.config", tempFile.getAbsolutePath()
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("perf-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
private void testHeaderMatchContent(boolean detailed, int expectedOutputLineCount, Runnable runnable) {
String out = ToolsTestUtils.captureStandardOut(() -> {
ConsumerPerformance.printHeader(detailed);
runnable.run();
});
String[] contents = out.split("\n");
assertEquals(expectedOutputLineCount, contents.length);
String header = contents[0];
String body = contents[1];
assertEquals(header.split(",\\s").length, body.split(",\\s").length);
}
}
Loading…
Cancel
Save