From c6286b2b3e0d573773844f9b0a2931e9c2058f58 Mon Sep 17 00:00:00 2001 From: huxi Date: Tue, 30 Jul 2019 23:44:11 +0800 Subject: [PATCH] KAFKA-8442; Include ISR in Metadata response even if there is no leader (#6836) Currently the Metadata response returns an empty ISR if there is no active leader. The behavior is inconsistent since other fields such as the replica list and offline replicas are included. This patch changes the behavior to return the current known ISR. This fixes a problem with the topic describe command which fails to report ISR when a leader is offline. Reviewers: Jason Gustafson --- core/src/main/scala/kafka/admin/TopicCommand.scala | 5 ++++- core/src/main/scala/kafka/server/MetadataCache.scala | 7 +++---- .../unit/kafka/admin/TopicCommandWithAdminClientTest.scala | 2 +- .../test/scala/unit/kafka/server/MetadataCacheTest.scala | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index f2e197ae122..a25e546b713 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -120,7 +120,10 @@ object TopicCommand extends Logging { opts.reportUnavailablePartitions && hasUnavailablePartitions(partitionDescription) } private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = { - partitionDescription.isr.size < partitionDescription.minIsrCount + if (partitionDescription.leader.isDefined) + partitionDescription.isr.size < partitionDescription.minIsrCount + else + partitionDescription.minIsrCount > 0 } private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = { partitionDescription.isr.size == partitionDescription.minIsrCount diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 8b8e159be98..7d10bb00425 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -82,6 +82,8 @@ class MetadataCache(brokerId: Int) extends Logging { val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints) val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints) + val isr = partitionState.basePartitionState.isr.asScala + val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock @@ -92,13 +94,10 @@ class MetadataCache(brokerId: Int) extends Logging { if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE } new MetadataResponse.PartitionMetadata(error, partitionId.toInt, Node.noNode(), - Optional.empty(), replicaInfo.asJava, java.util.Collections.emptyList(), + Optional.empty(), replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) case Some(leader) => - val isr = partitionState.basePartitionState.isr.asScala - val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints) - if (replicaInfo.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}") diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 8813b598117..a04ef0dac7f 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -602,7 +602,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions")))) val rows = output.split("\n") assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName")) - assertTrue(rows(0).endsWith("Leader: none\tReplicas: 0\tIsr: ")) + assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:")) } finally { restartDeadBrokers() } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 5ddabc0b6f7..9f73fbea988 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -182,7 +182,7 @@ class MetadataCacheTest { val partitionMetadata = partitionMetadatas.get(0) assertEquals(0, partitionMetadata.partition) assertEquals(expectedError, partitionMetadata.error) - assertTrue(partitionMetadata.isr.isEmpty) + assertFalse(partitionMetadata.isr.isEmpty) assertEquals(1, partitionMetadata.replicas.size) assertEquals(0, partitionMetadata.replicas.get(0).id) }