Browse Source

KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (#7094)

If there are **no topics** in a cluster, kafka-topics.sh --describe without a --topic option should return empty list, not throw an exception.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6069/head
Tirtha Chatterjee 5 years ago committed by Jason Gustafson
parent
commit
ab8a7ff363
  1. 26
      core/src/main/scala/kafka/admin/TopicCommand.scala
  2. 5
      core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

26
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -208,7 +208,7 @@ object TopicCommand extends Logging { @@ -208,7 +208,7 @@ object TopicCommand extends Logging {
override def alterTopic(opts: TopicCommandOptions): Unit = {
val topic = new CommandTopicPartition(opts)
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics)
ensureTopicExists(topics, opts.topic)
val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
adminClient.createPartitions(topics.map {topicName =>
if (topic.hasReplicaAssignment) {
@ -267,7 +267,7 @@ object TopicCommand extends Logging { @@ -267,7 +267,7 @@ object TopicCommand extends Logging {
override def deleteTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics)
ensureTopicExists(topics, opts.topic)
adminClient.deleteTopics(topics.asJavaCollection).all().get()
}
@ -317,7 +317,7 @@ object TopicCommand extends Logging { @@ -317,7 +317,7 @@ object TopicCommand extends Logging {
override def alterTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
val tp = new CommandTopicPartition(opts)
ensureTopicExists(topics, opts.ifExists)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
val adminZkClient = new AdminZkClient(zkClient)
topics.foreach { topic =>
val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
@ -354,8 +354,7 @@ object TopicCommand extends Logging { @@ -354,8 +354,7 @@ object TopicCommand extends Logging {
override def describeTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
val topicOptWithExits = opts.topic.isDefined && opts.ifExists
ensureTopicExists(topics, topicOptWithExits)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
val describeOptions = new DescribeOptions(opts, liveBrokers)
val adminZkClient = new AdminZkClient(zkClient)
@ -401,7 +400,7 @@ object TopicCommand extends Logging { @@ -401,7 +400,7 @@ object TopicCommand extends Logging {
override def deleteTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics, opts.ifExists)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
topics.foreach { topic =>
try {
if (Topic.isInternal(topic)) {
@ -433,14 +432,17 @@ object TopicCommand extends Logging { @@ -433,14 +432,17 @@ object TopicCommand extends Logging {
/**
* ensures topic existence and throws exception if topic doesn't exist
*
* @param opts
* @param topics
* @param topicOptWithExists
* @param foundTopics Topics that were found to match the requested topic name.
* @param requestedTopic Name of the topic that was requested.
* @param requireTopicExists Indicates if the topic needs to exist for the operation to be successful.
* If set to true, the command will throw an exception if the topic with the
* requested name does not exist.
*/
private def ensureTopicExists(topics: Seq[String], topicOptWithExists: Boolean = false) = {
if (topics.isEmpty && !topicOptWithExists) {
private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String], requireTopicExists: Boolean = true) = {
// If no topic name was mentioned, do not need to throw exception.
if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty) {
// If given topic doesn't exist then throw exception
throw new IllegalArgumentException(s"Topics in [${topics.mkString(",")}] does not exist")
throw new IllegalArgumentException(s"Topic '${requestedTopic.get}' does not exist as expected")
}
}

5
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

@ -416,6 +416,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT @@ -416,6 +416,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
topicService.describeTopic(describeOpts)
}
// describe all topics
val describeOptsAllTopics = new TopicCommandOptions(Array())
// should not throw any error
topicService.describeTopic(describeOptsAllTopics)
// describe topic that does not exist with --if-exists
val describeOptsWithExists = new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists"))
// should not throw any error

Loading…
Cancel
Save