Browse Source

KAFKA-13143; Remove Metadata handling from ControllerApis (#11135)

This PR removes the `METADATA` API from the Kraft controller as the controller does not yet implement the metadata fetch functionality completely.

Without the change (as per the JIRA https://issues.apache.org/jira/browse/KAFKA-13143), the API would return an empty list of topics making the caller incorrectly think that there were no topics in the cluster which could be confusing. After this change the describe and list topic APIs timeout on the controller endpoint when using the `kafka-topics` CLI (which is the same behavior as create_topic).

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
pull/11147/head
Niket 3 years ago committed by GitHub
parent
commit
257533a21b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/resources/common/message/MetadataRequest.json
  2. 38
      core/src/main/scala/kafka/server/ControllerApis.scala

2
clients/src/main/resources/common/message/MetadataRequest.json

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
{
"apiKey": 3,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker", "broker"],
"name": "MetadataRequest",
"validVersions": "0-11",
"flexibleVersions": "9+",

38
core/src/main/scala/kafka/server/ControllerApis.scala

@ -40,12 +40,10 @@ import org.apache.kafka.common.message.CreateTopicsRequestData @@ -40,12 +40,10 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
import org.apache.kafka.common.message._
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
@ -86,7 +84,6 @@ class ControllerApis(val requestChannel: RequestChannel, @@ -86,7 +84,6 @@ class ControllerApis(val requestChannel: RequestChannel,
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
@ -151,41 +148,6 @@ class ControllerApis(val requestChannel: RequestChannel, @@ -151,41 +148,6 @@ class ControllerApis(val requestChannel: RequestChannel,
handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
}
def handleMetadataRequest(request: RequestChannel.Request): Unit = {
val metadataRequest = request.body[MetadataRequest]
def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
val metadataResponseData = new MetadataResponseData()
metadataResponseData.setThrottleTimeMs(requestThrottleMs)
controllerNodes.foreach { node =>
metadataResponseData.brokers.add(new MetadataResponseBroker()
.setHost(node.host)
.setNodeId(node.id)
.setPort(node.port)
.setRack(node.rack))
}
metadataResponseData.setClusterId(metaProperties.clusterId)
if (controller.isActive) {
metadataResponseData.setControllerId(config.nodeId)
} else {
metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
}
val clusterAuthorizedOperations = if (metadataRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
authHelper.authorizedOperations(request, Resource.CLUSTER)
} else {
0
}
} else {
Int.MinValue
}
// TODO: fill in information about the metadata topic
metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations)
new MetadataResponse(metadataResponseData, request.header.apiVersion)
}
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(requestThrottleMs))
}
def handleDeleteTopics(request: RequestChannel.Request): Unit = {
val deleteTopicsRequest = request.body[DeleteTopicsRequest]
val future = deleteTopics(deleteTopicsRequest.data,

Loading…
Cancel
Save