diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 04e8886df38..8d1911d7e08 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong import java.util.{Properties, Random} import com.typesafe.scalalogging.LazyLogging -import joptsimple.{OptionException, OptionParser, OptionSet} import kafka.utils.{CommandLineUtils, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -256,7 +255,7 @@ object ConsumerPerformance extends LazyLogging { .ofType(classOf[Long]) .defaultsTo(10000) - options = tryParse(parser, args) + options = parser.parse(args: _*) CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) @@ -272,7 +271,6 @@ object ConsumerPerformance extends LazyLogging { val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts) - props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString) props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString) @@ -291,14 +289,6 @@ object ConsumerPerformance extends LazyLogging { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue() - - def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { - try - parser.parse(args: _*) - catch { - case e: OptionException => - CommandLineUtils.printUsageAndDie(parser, e.getMessage) - } - } } + } diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala index 2ae4fa7d5b4..f8d0e1545b0 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala @@ -54,7 +54,7 @@ class ConsumerPerformanceTest { val config = new ConsumerPerformance.ConsumerPerfConfig(args) //Then - assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts)) + assertEquals("localhost:9092", config.brokerHostsAndPorts) assertEquals("test", config.topic) assertEquals(10, config.numMessages) } @@ -72,7 +72,7 @@ class ConsumerPerformanceTest { val config = new ConsumerPerformance.ConsumerPerfConfig(args) //Then - assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts)) + assertEquals("localhost:9092", config.brokerHostsAndPorts) assertEquals("test", config.topic) assertEquals(10, config.numMessages) }