diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index a319f2f438b..bce004f9198 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -79,7 +79,7 @@ case class TopicMetadataRequest(val versionId: Short, val topicMetadata = topics.map { topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = TopicMetadataResponse(topicMetadata, correlationId) + val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index f6b7429faea..b233d3593c0 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -29,34 +29,27 @@ object TopicMetadataResponse { val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) - new TopicMetadataResponse(topicsMetadata, correlationId) + new TopicMetadataResponse(brokers, topicsMetadata, correlationId) } } -case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata], +case class TopicMetadataResponse(brokers: Seq[Broker], + topicsMetadata: Seq[TopicMetadata], override val correlationId: Int) extends RequestOrResponse(correlationId = correlationId) { val sizeInBytes: Int = { - val brokers = extractBrokers(topicsMetadata).values 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum } def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) /* brokers */ - val brokers = extractBrokers(topicsMetadata).values buffer.putInt(brokers.size) brokers.foreach(_.writeTo(buffer)) /* topic metadata */ buffer.putInt(topicsMetadata.length) topicsMetadata.foreach(_.writeTo(buffer)) } - - def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = { - val parts = topicsMetadata.flatMap(_.partitionsMetadata) - val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l}) - brokers.map(b => (b.id, b)).toMap - } override def describe(details: Boolean):String = { toString } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f230c8..fd5f12ee31e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -591,8 +591,9 @@ class KafkaApis(val requestChannel: RequestChannel, def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) - trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) - val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) + val brokers = metadataCache.getAliveBrokers + trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) + val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 3198cdf40e7..7cd40e16182 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -81,6 +81,12 @@ private[server] class MetadataCache { topicResponses } + def getAliveBrokers = { + inLock(partitionMetadataLock.readLock()) { + aliveBrokers.values.toList + } + } + def addOrUpdatePartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) { diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index c040f49d002..af478364680 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging { info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) - val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata) + val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) true diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index a2117b34c2e..d34ddf5b6b8 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -143,7 +143,7 @@ object SerializationTestUtils { } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1) + new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1) } def createTestOffsetCommitRequest: OffsetCommitRequest = {