diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 5394ae54d13..b1c1c0a23d7 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -120,7 +120,7 @@ object TopicCommand extends Logging { config.map(_.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value.toInt) } - def hasUnderReplicatedPartitions: Boolean = { + def isUnderReplicated: Boolean = { getReplicationFactor(info, reassignment) - info.isr.size > 0 } @@ -128,7 +128,7 @@ object TopicCommand extends Logging { info.leader != null } - def hasUnderMinIsrPartitions: Boolean = { + def isUnderMinIsr: Boolean = { !hasLeader || minIsrCount.exists(info.isr.size < _) } @@ -161,13 +161,13 @@ object TopicCommand extends Logging { val describePartitions: Boolean = !opts.reportOverriddenConfigs private def shouldPrintUnderReplicatedPartitions(partitionDescription: PartitionDescription): Boolean = { - opts.reportUnderReplicatedPartitions && partitionDescription.hasUnderReplicatedPartitions + opts.reportUnderReplicatedPartitions && partitionDescription.isUnderReplicated } private def shouldPrintUnavailablePartitions(partitionDescription: PartitionDescription): Boolean = { opts.reportUnavailablePartitions && partitionDescription.hasUnavailablePartitions(liveBrokers) } private def shouldPrintUnderMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = { - opts.reportUnderMinIsrPartitions && partitionDescription.hasUnderMinIsrPartitions + opts.reportUnderMinIsrPartitions && partitionDescription.isUnderMinIsr } private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = { opts.reportAtMinIsrPartitions && partitionDescription.isAtMinIsrPartitions @@ -269,27 +269,26 @@ object TopicCommand extends Logging { }}.toMap.asJava).all().get() } - override def describeTopic(opts: TopicCommandOptions): Unit = { - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() - val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) - val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala - val reassignments: util.Map[TopicPartition, PartitionReassignment] = try { - if (opts.topic.isEmpty) { - adminClient.listPartitionReassignments(new ListPartitionReassignmentsOptions).reassignments().get() - } else { - val topicPartitions = topicDescriptions.flatMap(pi => pi.partitions().asScala.map(tpi => new TopicPartition(pi.name(), tpi.partition()))) - adminClient.listPartitionReassignments(topicPartitions.toSet.asJava).reassignments().get() - } + private def listAllReassignments(): Map[TopicPartition, PartitionReassignment] = { + try { + adminClient.listPartitionReassignments(new ListPartitionReassignmentsOptions).reassignments().get().asScala } catch { case e: ExecutionException => e.getCause match { case ex: UnsupportedVersionException => logger.debug("Couldn't query reassignments through the AdminClient API", ex) - Collections.emptyMap() + Map() case t => throw t } } + } + + override def describeTopic(opts: TopicCommandOptions): Unit = { + val topics = getTopics(opts.topic, opts.excludeInternalTopics) + val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() + val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) + val reassignments = listAllReassignments() + val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) for (td <- topicDescriptions) { @@ -302,7 +301,7 @@ object TopicCommand extends Logging { if (!opts.reportOverriddenConfigs || hasNonDefault) { val numPartitions = td.partitions().size val firstPartition = td.partitions.iterator.next() - val reassignment = getReassignment(td.name, firstPartition.partition, reassignments) + val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) topicDesc.printDescription() } @@ -310,7 +309,7 @@ object TopicCommand extends Logging { if (describeOptions.describePartitions) { for (partition <- sortedPartitions) { - val reassignment = getReassignment(td.name, partition.partition, reassignments) + val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition)) val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment) describeOptions.maybePrintPartitionDescription(partitionDesc) } @@ -318,11 +317,6 @@ object TopicCommand extends Logging { } } - private def getReassignment(topic: String, - partition: Int, - reassignments: util.Map[TopicPartition, PartitionReassignment]): Option[PartitionReassignment] = - Option(reassignments.get(new TopicPartition(topic, partition))) - override def deleteTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) ensureTopicExists(topics, opts.topic) @@ -561,9 +555,18 @@ object TopicCommand extends Logging { } private def getReplicationFactor(tpi: TopicPartitionInfo, reassignment: Option[PartitionReassignment]): Int = { + // It is possible for a reassignment to complete between the time we have fetched its state and the time + // we fetch partition metadata. In ths case, we ignore the reassignment when determining replication factor. + def isReassignmentInProgress(ra: PartitionReassignment): Boolean = { + // Reassignment is still in progress as long as the removing replicas are still present + val allReplicaIds = tpi.replicas.asScala.map(_.id).toSet + val removingReplicaIds = ra.removingReplicas.asScala.map(Int.unbox).toSet + allReplicaIds.exists(removingReplicaIds.contains) + } + reassignment match { - case Some(ra) => ra.replicas.asScala.diff(ra.addingReplicas.asScala).size - case None => tpi.replicas.size + case Some(ra) if isReassignmentInProgress(ra) => ra.replicas.asScala.diff(ra.addingReplicas.asScala).size + case _=> tpi.replicas.size } } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 4c5742a089c..288b72f15e4 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -691,7 +691,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin val underReplicatedOutput = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions")))) - assertTrue("--under-replicated-partitions shouldn't return anything", underReplicatedOutput.isEmpty) + assertEquals("--under-replicated-partitions shouldn't return anything", "", underReplicatedOutput) TestUtils.resetBrokersThrottle(adminClient, brokerIds) TestUtils.waitForAllReassignmentsToComplete(adminClient)