@ -27,7 +27,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
@@ -27,7 +27,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common. { Metric , MetricName , TopicPartition }
import kafka.utils. { CommandLineUtils , ToolsUtils }
import java.util. { Collections , Properties , Random }
import java.util. { Properties , Random }
import java.text.SimpleDateFormat
import java.time.Duration
@ -54,7 +54,6 @@ object ConsumerPerformance extends LazyLogging {
@@ -54,7 +54,6 @@ object ConsumerPerformance extends LazyLogging {
var startMs , endMs = 0L
val consumer = new KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] ( config . props )
consumer . subscribe ( Collections . singletonList ( config . topic ) )
startMs = System . currentTimeMillis
consume ( consumer , List ( config . topic ) , config . numMessages , config . recordFetchTimeoutMs , config , totalMessagesRead , totalBytesRead , joinGroupTimeInMs , startMs )
endMs = System . currentTimeMillis
@ -121,10 +120,9 @@ object ConsumerPerformance extends LazyLogging {
@@ -121,10 +120,9 @@ object ConsumerPerformance extends LazyLogging {
} } )
// Now start the benchmark
val startMs = System . currentTimeMillis
var lastReportTime : Long = startMs
var lastConsumedTime = System . currentTimeMillis
var currentTimeMillis = lastConsumedTime
var currentTimeMillis = System . currentTimeMillis
var lastReportTime : Long = currentTimeMillis
var lastConsumedTime = currentTimeMillis
while ( messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout ) {
val records = consumer . poll ( Duration . ofMillis ( 100 ) ) . asScala