Browse Source

HOTFIX: Fix breakage in `ConsumerPerformanceTest` (#8113)

Test cases in `ConsumerPerformanceTest` were failing and causing a system exit rather than throwing the expected exception following #8023. We didn't catch this because the tests were marked as skipped and not failed.

Reviewers: Guozhang Wang <guozhang@confluent.io>
pull/8060/head
Jason Gustafson 5 years ago committed by GitHub
parent
commit
9da6823a3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      core/src/main/scala/kafka/tools/ConsumerPerformance.scala
  2. 4
      core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala

14
core/src/main/scala/kafka/tools/ConsumerPerformance.scala

@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.{Properties, Random}
import com.typesafe.scalalogging.LazyLogging
import joptsimple.{OptionException, OptionParser, OptionSet}
import kafka.utils.{CommandLineUtils, ToolsUtils}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
@ -256,7 +255,7 @@ object ConsumerPerformance extends LazyLogging { @@ -256,7 +255,7 @@ object ConsumerPerformance extends LazyLogging {
.ofType(classOf[Long])
.defaultsTo(10000)
options = tryParse(parser, args)
options = parser.parse(args: _*)
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)
@ -272,7 +271,6 @@ object ConsumerPerformance extends LazyLogging { @@ -272,7 +271,6 @@ object ConsumerPerformance extends LazyLogging {
val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts)
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
@ -291,14 +289,6 @@ object ConsumerPerformance extends LazyLogging { @@ -291,14 +289,6 @@ object ConsumerPerformance extends LazyLogging {
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue()
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
}
}
}
}

4
core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala

@ -54,7 +54,7 @@ class ConsumerPerformanceTest { @@ -54,7 +54,7 @@ class ConsumerPerformanceTest {
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts))
assertEquals("localhost:9092", config.brokerHostsAndPorts)
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
@ -72,7 +72,7 @@ class ConsumerPerformanceTest { @@ -72,7 +72,7 @@ class ConsumerPerformanceTest {
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts))
assertEquals("localhost:9092", config.brokerHostsAndPorts)
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}

Loading…
Cancel
Save