Browse Source

KAFKA-1535 Have the metadata response contain all alive brokers rather than just the ones needed for the given topics.

pull/28/head
Jay Kreps 10 years ago
parent
commit
4ebcdfd51f
  1. 2
      core/src/main/scala/kafka/api/TopicMetadataRequest.scala
  2. 13
      core/src/main/scala/kafka/api/TopicMetadataResponse.scala
  3. 5
      core/src/main/scala/kafka/server/KafkaApis.scala
  4. 6
      core/src/main/scala/kafka/server/MetadataCache.scala
  5. 2
      core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
  6. 2
      core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala

2
core/src/main/scala/kafka/api/TopicMetadataRequest.scala

@ -79,7 +79,7 @@ case class TopicMetadataRequest(val versionId: Short, @@ -79,7 +79,7 @@ case class TopicMetadataRequest(val versionId: Short,
val topicMetadata = topics.map {
topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}

13
core/src/main/scala/kafka/api/TopicMetadataResponse.scala

@ -29,34 +29,27 @@ object TopicMetadataResponse { @@ -29,34 +29,27 @@ object TopicMetadataResponse {
val brokerMap = brokers.map(b => (b.id, b)).toMap
val topicCount = buffer.getInt
val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
new TopicMetadataResponse(topicsMetadata, correlationId)
new TopicMetadataResponse(brokers, topicsMetadata, correlationId)
}
}
case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
case class TopicMetadataResponse(brokers: Seq[Broker],
topicsMetadata: Seq[TopicMetadata],
override val correlationId: Int)
extends RequestOrResponse(correlationId = correlationId) {
val sizeInBytes: Int = {
val brokers = extractBrokers(topicsMetadata).values
4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
/* brokers */
val brokers = extractBrokers(topicsMetadata).values
buffer.putInt(brokers.size)
brokers.foreach(_.writeTo(buffer))
/* topic metadata */
buffer.putInt(topicsMetadata.length)
topicsMetadata.foreach(_.writeTo(buffer))
}
def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
val parts = topicsMetadata.flatMap(_.partitionsMetadata)
val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l})
brokers.map(b => (b.id, b)).toMap
}
override def describe(details: Boolean):String = { toString }
}

5
core/src/main/scala/kafka/server/KafkaApis.scala

@ -591,8 +591,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -591,8 +591,9 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId)
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}

6
core/src/main/scala/kafka/server/MetadataCache.scala

@ -81,6 +81,12 @@ private[server] class MetadataCache { @@ -81,6 +81,12 @@ private[server] class MetadataCache {
topicResponses
}
def getAliveBrokers = {
inLock(partitionMetadataLock.readLock()) {
aliveBrokers.values.toList
}
}
def addOrUpdatePartitionInfo(topic: String,
partitionId: Int,
stateInfo: PartitionStateInfo) {

2
core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala

@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging { @@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging {
info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata)
val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
true

2
core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala

@ -143,7 +143,7 @@ object SerializationTestUtils { @@ -143,7 +143,7 @@ object SerializationTestUtils {
}
def createTestTopicMetadataResponse: TopicMetadataResponse = {
new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1)
new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
}
def createTestOffsetCommitRequest: OffsetCommitRequest = {

Loading…
Cancel
Save