From 1d496a26c998cefc77223a7575980208620dfe2d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 5 Dec 2019 16:57:23 -0800 Subject: [PATCH] KAFKA-9179; Fix flaky test due to race condition when fetching reassignment state (#7786) This patch fixes a race condition on reassignment completion. The previous code fetched metadata first and then fetched the reassignment state. It is possible in between those times for the reassignment to complete, which leads to spurious URPs being reported. The fix here is to change the order of these checks and to explicitly check for reassignment completion. Note this patch fixes the flaky test `TopicCommandWithAdminClientTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`. Reviewers: Guozhang Wang --- .../main/scala/kafka/admin/TopicCommand.scala | 55 ++++++++++--------- .../TopicCommandWithAdminClientTest.scala | 2 +- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 5394ae54d13..b1c1c0a23d7 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -120,7 +120,7 @@ object TopicCommand extends Logging { config.map(_.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value.toInt) } - def hasUnderReplicatedPartitions: Boolean = { + def isUnderReplicated: Boolean = { getReplicationFactor(info, reassignment) - info.isr.size > 0 } @@ -128,7 +128,7 @@ object TopicCommand extends Logging { info.leader != null } - def hasUnderMinIsrPartitions: Boolean = { + def isUnderMinIsr: Boolean = { !hasLeader || minIsrCount.exists(info.isr.size < _) } @@ -161,13 +161,13 @@ object TopicCommand extends Logging { val describePartitions: Boolean = !opts.reportOverriddenConfigs private def shouldPrintUnderReplicatedPartitions(partitionDescription: PartitionDescription): Boolean = { - opts.reportUnderReplicatedPartitions && partitionDescription.hasUnderReplicatedPartitions + opts.reportUnderReplicatedPartitions && partitionDescription.isUnderReplicated } private def shouldPrintUnavailablePartitions(partitionDescription: PartitionDescription): Boolean = { opts.reportUnavailablePartitions && partitionDescription.hasUnavailablePartitions(liveBrokers) } private def shouldPrintUnderMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = { - opts.reportUnderMinIsrPartitions && partitionDescription.hasUnderMinIsrPartitions + opts.reportUnderMinIsrPartitions && partitionDescription.isUnderMinIsr } private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = { opts.reportAtMinIsrPartitions && partitionDescription.isAtMinIsrPartitions @@ -269,27 +269,26 @@ object TopicCommand extends Logging { }}.toMap.asJava).all().get() } - override def describeTopic(opts: TopicCommandOptions): Unit = { - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() - val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) - val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala - val reassignments: util.Map[TopicPartition, PartitionReassignment] = try { - if (opts.topic.isEmpty) { - adminClient.listPartitionReassignments(new ListPartitionReassignmentsOptions).reassignments().get() - } else { - val topicPartitions = topicDescriptions.flatMap(pi => pi.partitions().asScala.map(tpi => new TopicPartition(pi.name(), tpi.partition()))) - adminClient.listPartitionReassignments(topicPartitions.toSet.asJava).reassignments().get() - } + private def listAllReassignments(): Map[TopicPartition, PartitionReassignment] = { + try { + adminClient.listPartitionReassignments(new ListPartitionReassignmentsOptions).reassignments().get().asScala } catch { case e: ExecutionException => e.getCause match { case ex: UnsupportedVersionException => logger.debug("Couldn't query reassignments through the AdminClient API", ex) - Collections.emptyMap() + Map() case t => throw t } } + } + + override def describeTopic(opts: TopicCommandOptions): Unit = { + val topics = getTopics(opts.topic, opts.excludeInternalTopics) + val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() + val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) + val reassignments = listAllReassignments() + val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) for (td <- topicDescriptions) { @@ -302,7 +301,7 @@ object TopicCommand extends Logging { if (!opts.reportOverriddenConfigs || hasNonDefault) { val numPartitions = td.partitions().size val firstPartition = td.partitions.iterator.next() - val reassignment = getReassignment(td.name, firstPartition.partition, reassignments) + val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) topicDesc.printDescription() } @@ -310,7 +309,7 @@ object TopicCommand extends Logging { if (describeOptions.describePartitions) { for (partition <- sortedPartitions) { - val reassignment = getReassignment(td.name, partition.partition, reassignments) + val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition)) val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment) describeOptions.maybePrintPartitionDescription(partitionDesc) } @@ -318,11 +317,6 @@ object TopicCommand extends Logging { } } - private def getReassignment(topic: String, - partition: Int, - reassignments: util.Map[TopicPartition, PartitionReassignment]): Option[PartitionReassignment] = - Option(reassignments.get(new TopicPartition(topic, partition))) - override def deleteTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) ensureTopicExists(topics, opts.topic) @@ -561,9 +555,18 @@ object TopicCommand extends Logging { } private def getReplicationFactor(tpi: TopicPartitionInfo, reassignment: Option[PartitionReassignment]): Int = { + // It is possible for a reassignment to complete between the time we have fetched its state and the time + // we fetch partition metadata. In ths case, we ignore the reassignment when determining replication factor. + def isReassignmentInProgress(ra: PartitionReassignment): Boolean = { + // Reassignment is still in progress as long as the removing replicas are still present + val allReplicaIds = tpi.replicas.asScala.map(_.id).toSet + val removingReplicaIds = ra.removingReplicas.asScala.map(Int.unbox).toSet + allReplicaIds.exists(removingReplicaIds.contains) + } + reassignment match { - case Some(ra) => ra.replicas.asScala.diff(ra.addingReplicas.asScala).size - case None => tpi.replicas.size + case Some(ra) if isReassignmentInProgress(ra) => ra.replicas.asScala.diff(ra.addingReplicas.asScala).size + case _=> tpi.replicas.size } } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 4c5742a089c..288b72f15e4 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -691,7 +691,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin val underReplicatedOutput = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions")))) - assertTrue("--under-replicated-partitions shouldn't return anything", underReplicatedOutput.isEmpty) + assertEquals("--under-replicated-partitions shouldn't return anything", "", underReplicatedOutput) TestUtils.resetBrokersThrottle(adminClient, brokerIds) TestUtils.waitForAllReassignmentsToComplete(adminClient)