From ab8a7ff36326742e1c6d4fc0a0aff818c7e6d313 Mon Sep 17 00:00:00 2001 From: Tirtha Chatterjee Date: Thu, 18 Jul 2019 13:25:25 -0700 Subject: [PATCH] KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (#7094) If there are **no topics** in a cluster, kafka-topics.sh --describe without a --topic option should return empty list, not throw an exception. Reviewers: Jason Gustafson --- .../main/scala/kafka/admin/TopicCommand.scala | 26 ++++++++++--------- .../unit/kafka/admin/TopicCommandTest.scala | 5 ++++ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 4f529969645..ae3cde87b31 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -208,7 +208,7 @@ object TopicCommand extends Logging { override def alterTopic(opts: TopicCommandOptions): Unit = { val topic = new CommandTopicPartition(opts) val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics) + ensureTopicExists(topics, opts.topic) val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values() adminClient.createPartitions(topics.map {topicName => if (topic.hasReplicaAssignment) { @@ -267,7 +267,7 @@ object TopicCommand extends Logging { override def deleteTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics) + ensureTopicExists(topics, opts.topic) adminClient.deleteTopics(topics.asJavaCollection).all().get() } @@ -317,7 +317,7 @@ object TopicCommand extends Logging { override def alterTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) val tp = new CommandTopicPartition(opts) - ensureTopicExists(topics, opts.ifExists) + ensureTopicExists(topics, opts.topic, !opts.ifExists) val adminZkClient = new AdminZkClient(zkClient) topics.foreach { topic => val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) @@ -354,8 +354,7 @@ object TopicCommand extends Logging { override def describeTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) - val topicOptWithExits = opts.topic.isDefined && opts.ifExists - ensureTopicExists(topics, topicOptWithExits) + ensureTopicExists(topics, opts.topic, !opts.ifExists) val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet val describeOptions = new DescribeOptions(opts, liveBrokers) val adminZkClient = new AdminZkClient(zkClient) @@ -401,7 +400,7 @@ object TopicCommand extends Logging { override def deleteTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.ifExists) + ensureTopicExists(topics, opts.topic, !opts.ifExists) topics.foreach { topic => try { if (Topic.isInternal(topic)) { @@ -433,14 +432,17 @@ object TopicCommand extends Logging { /** * ensures topic existence and throws exception if topic doesn't exist * - * @param opts - * @param topics - * @param topicOptWithExists + * @param foundTopics Topics that were found to match the requested topic name. + * @param requestedTopic Name of the topic that was requested. + * @param requireTopicExists Indicates if the topic needs to exist for the operation to be successful. + * If set to true, the command will throw an exception if the topic with the + * requested name does not exist. */ - private def ensureTopicExists(topics: Seq[String], topicOptWithExists: Boolean = false) = { - if (topics.isEmpty && !topicOptWithExists) { + private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String], requireTopicExists: Boolean = true) = { + // If no topic name was mentioned, do not need to throw exception. + if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty) { // If given topic doesn't exist then throw exception - throw new IllegalArgumentException(s"Topics in [${topics.mkString(",")}] does not exist") + throw new IllegalArgumentException(s"Topic '${requestedTopic.get}' does not exist as expected") } } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 407c2f3bcc0..35502f54d07 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -416,6 +416,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT topicService.describeTopic(describeOpts) } + // describe all topics + val describeOptsAllTopics = new TopicCommandOptions(Array()) + // should not throw any error + topicService.describeTopic(describeOptsAllTopics) + // describe topic that does not exist with --if-exists val describeOptsWithExists = new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")) // should not throw any error