diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index f2e197ae122..a25e546b713 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -120,7 +120,10 @@ object TopicCommand extends Logging { opts.reportUnavailablePartitions && hasUnavailablePartitions(partitionDescription) } private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = { - partitionDescription.isr.size < partitionDescription.minIsrCount + if (partitionDescription.leader.isDefined) + partitionDescription.isr.size < partitionDescription.minIsrCount + else + partitionDescription.minIsrCount > 0 } private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = { partitionDescription.isr.size == partitionDescription.minIsrCount diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 8b8e159be98..7d10bb00425 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -82,6 +82,8 @@ class MetadataCache(brokerId: Int) extends Logging { val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints) val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints) + val isr = partitionState.basePartitionState.isr.asScala + val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock @@ -92,13 +94,10 @@ class MetadataCache(brokerId: Int) extends Logging { if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE } new MetadataResponse.PartitionMetadata(error, partitionId.toInt, Node.noNode(), - Optional.empty(), replicaInfo.asJava, java.util.Collections.emptyList(), + Optional.empty(), replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) case Some(leader) => - val isr = partitionState.basePartitionState.isr.asScala - val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints) - if (replicaInfo.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}") diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 8813b598117..a04ef0dac7f 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -602,7 +602,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions")))) val rows = output.split("\n") assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName")) - assertTrue(rows(0).endsWith("Leader: none\tReplicas: 0\tIsr: ")) + assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:")) } finally { restartDeadBrokers() } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 5ddabc0b6f7..9f73fbea988 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -182,7 +182,7 @@ class MetadataCacheTest { val partitionMetadata = partitionMetadatas.get(0) assertEquals(0, partitionMetadata.partition) assertEquals(expectedError, partitionMetadata.error) - assertTrue(partitionMetadata.isr.isEmpty) + assertFalse(partitionMetadata.isr.isEmpty) assertEquals(1, partitionMetadata.replicas.size) assertEquals(0, partitionMetadata.replicas.get(0).id) }