|
|
@ -24,7 +24,6 @@ import scala.collection.{Seq, Set, mutable} |
|
|
|
import scala.collection.JavaConverters._ |
|
|
|
import scala.collection.JavaConverters._ |
|
|
|
import kafka.cluster.{Broker, EndPoint} |
|
|
|
import kafka.cluster.{Broker, EndPoint} |
|
|
|
import kafka.api._ |
|
|
|
import kafka.api._ |
|
|
|
import kafka.common.TopicAndPartition |
|
|
|
|
|
|
|
import kafka.controller.StateChangeLogger |
|
|
|
import kafka.controller.StateChangeLogger |
|
|
|
import kafka.utils.CoreUtils._ |
|
|
|
import kafka.utils.CoreUtils._ |
|
|
|
import kafka.utils.Logging |
|
|
|
import kafka.utils.Logging |
|
|
@ -71,7 +70,7 @@ class MetadataCache(brokerId: Int) extends Logging { |
|
|
|
errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { |
|
|
|
errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { |
|
|
|
cache.get(topic).map { partitions => |
|
|
|
cache.get(topic).map { partitions => |
|
|
|
partitions.map { case (partitionId, partitionState) => |
|
|
|
partitions.map { case (partitionId, partitionState) => |
|
|
|
val topicPartition = TopicAndPartition(topic, partitionId) |
|
|
|
val topicPartition = new TopicPartition(topic, partitionId) |
|
|
|
val leaderBrokerId = partitionState.basePartitionState.leader |
|
|
|
val leaderBrokerId = partitionState.basePartitionState.leader |
|
|
|
val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName) |
|
|
|
val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName) |
|
|
|
val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) |
|
|
|
val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) |
|
|
|