Browse Source

KAFKA-8442; Include ISR in Metadata response even if there is no leader (#6836)

Currently the Metadata response returns an empty ISR if there is no active leader. The behavior is inconsistent since other fields such as the replica list and offline replicas are included. This patch changes the behavior to return the current known ISR. This fixes a problem with the topic describe command which fails to report ISR when a leader is offline.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/7138/head
huxi 5 years ago committed by Jason Gustafson
parent
commit
c6286b2b3e
  1. 5
      core/src/main/scala/kafka/admin/TopicCommand.scala
  2. 7
      core/src/main/scala/kafka/server/MetadataCache.scala
  3. 2
      core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
  4. 2
      core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala

5
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -120,7 +120,10 @@ object TopicCommand extends Logging { @@ -120,7 +120,10 @@ object TopicCommand extends Logging {
opts.reportUnavailablePartitions && hasUnavailablePartitions(partitionDescription)
}
private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size < partitionDescription.minIsrCount
if (partitionDescription.leader.isDefined)
partitionDescription.isr.size < partitionDescription.minIsrCount
else
partitionDescription.minIsrCount > 0
}
private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size == partitionDescription.minIsrCount

7
core/src/main/scala/kafka/server/MetadataCache.scala

@ -82,6 +82,8 @@ class MetadataCache(brokerId: Int) extends Logging { @@ -82,6 +82,8 @@ class MetadataCache(brokerId: Int) extends Logging {
val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
val isr = partitionState.basePartitionState.isr.asScala
val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
maybeLeader match {
case None =>
val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock
@ -92,13 +94,10 @@ class MetadataCache(brokerId: Int) extends Logging { @@ -92,13 +94,10 @@ class MetadataCache(brokerId: Int) extends Logging {
if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
}
new MetadataResponse.PartitionMetadata(error, partitionId.toInt, Node.noNode(),
Optional.empty(), replicaInfo.asJava, java.util.Collections.emptyList(),
Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
offlineReplicaInfo.asJava)
case Some(leader) =>
val isr = partitionState.basePartitionState.isr.asScala
val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")

2
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala

@ -602,7 +602,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @@ -602,7 +602,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions"))))
val rows = output.split("\n")
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
assertTrue(rows(0).endsWith("Leader: none\tReplicas: 0\tIsr: "))
assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"))
} finally {
restartDeadBrokers()
}

2
core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala

@ -182,7 +182,7 @@ class MetadataCacheTest { @@ -182,7 +182,7 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(expectedError, partitionMetadata.error)
assertTrue(partitionMetadata.isr.isEmpty)
assertFalse(partitionMetadata.isr.isEmpty)
assertEquals(1, partitionMetadata.replicas.size)
assertEquals(0, partitionMetadata.replicas.get(0).id)
}

Loading…
Cancel
Save