Browse Source

KAFKA-14446: API forwarding support from zkBrokers to the Controller (#12961)

This PR enables brokers which are upgrading from ZK mode to KRaft mode to forward certain metadata
change requests to the controller instead of applying them directly through ZK. To faciliate this,
we now support EnvelopeRequest on zkBrokers (instead of only on KRaft nodes.)

In BrokerToControllerChannelManager, we can now reinitialize our NetworkClient. This is needed to
handle the case when we transition from forwarding requests to a ZK-based broker over the
inter-broker listener, to forwarding requests to a quorum node over the controller listener.

In MetadataCache.scala, distinguish between KRaft and ZK controller nodes with a new type,
CachedControllerId.

In LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest, switch from sending both a
zk and a KRaft controller ID to sending a single controller ID plus a boolean to express whether it
is KRaft. The previous scheme was ambiguous as to whether the system was in KRaft or ZK mode when
both IDs were -1 (although this case is unlikely to come up in practice). The new scheme avoids
this ambiguity and is simpler to understand.

Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
pull/13003/head
Akhilesh C 2 years ago committed by GitHub
parent
commit
8b045dcbf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
  2. 18
      clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
  3. 18
      clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
  4. 26
      clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
  5. 2
      clients/src/main/resources/common/message/EnvelopeRequest.json
  6. 4
      clients/src/main/resources/common/message/LeaderAndIsrRequest.json
  7. 4
      clients/src/main/resources/common/message/StopReplicaRequest.json
  8. 4
      clients/src/main/resources/common/message/UpdateMetadataRequest.json
  9. 1
      clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
  10. 8
      core/src/main/scala/kafka/Kafka.scala
  11. 2
      core/src/main/scala/kafka/common/InterBrokerSendThread.scala
  12. 121
      core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
  13. 18
      core/src/main/scala/kafka/server/KafkaApis.scala
  14. 22
      core/src/main/scala/kafka/server/KafkaServer.scala
  15. 18
      core/src/main/scala/kafka/server/MetadataCache.scala
  16. 11
      core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
  17. 58
      core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
  18. 4
      core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  19. 2
      core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
  20. 4
      core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
  21. 52
      core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
  22. 8
      core/src/test/scala/kafka/utils/TestInfoUtils.scala
  23. 3
      core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
  24. 6
      core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
  25. 9
      core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
  26. 15
      core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
  27. 10
      core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
  28. 5
      core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
  29. 24
      core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
  30. 2
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  31. 2
      core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
  32. 8
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
  33. 7
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  34. 3
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
  35. 3
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
  36. 4
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
  37. 6
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
  38. 4
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

10
clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java

@ -27,14 +27,20 @@ public abstract class AbstractControlRequest extends AbstractRequest { @@ -27,14 +27,20 @@ public abstract class AbstractControlRequest extends AbstractRequest {
protected final int controllerId;
protected final int controllerEpoch;
protected final long brokerEpoch;
protected final boolean kraftController;
protected Builder(ApiKeys api, short version, int controllerId, int controllerEpoch, long brokerEpoch) {
this(api, version, controllerId, controllerEpoch, brokerEpoch, false);
}
protected Builder(ApiKeys api, short version, int controllerId, int controllerEpoch,
long brokerEpoch, boolean kraftController) {
super(api, version);
this.controllerId = controllerId;
this.controllerEpoch = controllerEpoch;
this.brokerEpoch = brokerEpoch;
this.kraftController = kraftController;
}
}
protected AbstractControlRequest(ApiKeys api, short version) {
@ -43,6 +49,8 @@ public abstract class AbstractControlRequest extends AbstractRequest { @@ -43,6 +49,8 @@ public abstract class AbstractControlRequest extends AbstractRequest {
public abstract int controllerId();
public abstract boolean isKRaftController();
public abstract int controllerEpoch();
public abstract long brokerEpoch();

18
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java

@ -51,7 +51,14 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -51,7 +51,14 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch);
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds,
liveLeaders, false);
}
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders, boolean kraftController) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
this.partitionStates = partitionStates;
this.topicIds = topicIds;
this.liveLeaders = liveLeaders;
@ -71,6 +78,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -71,6 +78,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
.setBrokerEpoch(brokerEpoch)
.setLiveLeaders(leaders);
if (version >= 7) {
data.setIsKRaftController(kraftController);
}
if (version >= 2) {
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
@ -168,6 +179,11 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -168,6 +179,11 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
return data.controllerId();
}
@Override
public boolean isKRaftController() {
return data.isKRaftController();
}
@Override
public int controllerEpoch() {
return data.controllerEpoch();

18
clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java

@ -45,7 +45,14 @@ public class StopReplicaRequest extends AbstractControlRequest { @@ -45,7 +45,14 @@ public class StopReplicaRequest extends AbstractControlRequest {
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
boolean deletePartitions, List<StopReplicaTopicState> topicStates) {
super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch);
this(version, controllerId, controllerEpoch, brokerEpoch, deletePartitions,
topicStates, false);
}
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
boolean deletePartitions, List<StopReplicaTopicState> topicStates,
boolean kraftController) {
super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
this.deletePartitions = deletePartitions;
this.topicStates = topicStates;
}
@ -56,6 +63,10 @@ public class StopReplicaRequest extends AbstractControlRequest { @@ -56,6 +63,10 @@ public class StopReplicaRequest extends AbstractControlRequest {
.setControllerEpoch(controllerEpoch)
.setBrokerEpoch(brokerEpoch);
if (version >= 4) {
data.setIsKRaftController(kraftController);
}
if (version >= 3) {
data.setTopicStates(topicStates);
} else if (version >= 1) {
@ -196,6 +207,11 @@ public class StopReplicaRequest extends AbstractControlRequest { @@ -196,6 +207,11 @@ public class StopReplicaRequest extends AbstractControlRequest {
return data.controllerId();
}
@Override
public boolean isKRaftController() {
return data.isKRaftController();
}
@Override
public int controllerEpoch() {
return data.controllerEpoch();

26
clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java

@ -51,7 +51,14 @@ public class UpdateMetadataRequest extends AbstractControlRequest { @@ -51,7 +51,14 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
Map<String, Uuid> topicIds) {
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch);
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates,
liveBrokers, topicIds, false);
}
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
Map<String, Uuid> topicIds, boolean kraftController) {
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
this.partitionStates = partitionStates;
this.liveBrokers = liveBrokers;
this.topicIds = topicIds;
@ -81,10 +88,14 @@ public class UpdateMetadataRequest extends AbstractControlRequest { @@ -81,10 +88,14 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
}
UpdateMetadataRequestData data = new UpdateMetadataRequestData()
.setControllerId(controllerId)
.setControllerEpoch(controllerEpoch)
.setBrokerEpoch(brokerEpoch)
.setLiveBrokers(liveBrokers);
.setControllerId(controllerId)
.setControllerEpoch(controllerEpoch)
.setBrokerEpoch(brokerEpoch)
.setLiveBrokers(liveBrokers);
if (version >= 8) {
data.setIsKRaftController(kraftController);
}
if (version >= 5) {
Map<String, UpdateMetadataTopicState> topicStatesMap = groupByTopic(topicIds, partitionStates);
@ -180,6 +191,11 @@ public class UpdateMetadataRequest extends AbstractControlRequest { @@ -180,6 +191,11 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
return data.controllerId();
}
@Override
public boolean isKRaftController() {
return data.isKRaftController();
}
@Override
public int controllerEpoch() {
return data.controllerEpoch();

2
clients/src/main/resources/common/message/EnvelopeRequest.json

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
{
"apiKey": 58,
"type": "request",
"listeners": ["controller"],
"listeners": ["controller", "zkBroker"],
"name": "EnvelopeRequest",
// Request struct for forwarding.
"validVersions": "0",

4
clients/src/main/resources/common/message/LeaderAndIsrRequest.json

@ -36,8 +36,8 @@ @@ -36,8 +36,8 @@
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The current controller ID." },
{ "name": "KRaftControllerId", "type": "int32", "versions": "7+", "entityType": "brokerId", "default": "-1",
"about": "The KRaft controller id, used during migration. See KIP-866" },
{ "name": "isKRaftController", "type": "bool", "versions": "7+", "default": "false",
"about": "If KRaft controller id is used during migration. See KIP-866" },
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
"about": "The current controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",

4
clients/src/main/resources/common/message/StopReplicaRequest.json

@ -31,8 +31,8 @@ @@ -31,8 +31,8 @@
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The controller id." },
{ "name": "KRaftControllerId", "type": "int32", "versions": "4+", "entityType": "brokerId", "default": "-1",
"about": "The KRaft controller id, used during migration. See KIP-866" },
{ "name": "isKRaftController", "type": "bool", "versions": "4+", "default": "false",
"about": "If KRaft controller id is used during migration. See KIP-866" },
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
"about": "The controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", "ignorable": true,

4
clients/src/main/resources/common/message/UpdateMetadataRequest.json

@ -36,8 +36,8 @@ @@ -36,8 +36,8 @@
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The controller id." },
{ "name": "KRaftControllerId", "type": "int32", "versions": "8+", "entityType": "brokerId",
"about": "The KRaft controller id, used during migration." },
{ "name": "isKRaftController", "type": "bool", "versions": "8+", "default": "false",
"about": "If KRaft controller id is used during migration. See KIP-866" },
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
"about": "The controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable": true, "default": "-1",

1
clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java

@ -188,7 +188,6 @@ public class ApiVersionsResponseTest { @@ -188,7 +188,6 @@ public class ApiVersionsResponseTest {
// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
HashSet<ApiKeys> exposedApis = apiKeysInResponse(response);
assertFalse(exposedApis.contains(ApiKeys.ENVELOPE));
assertFalse(exposedApis.contains(ApiKeys.VOTE));
assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));

8
core/src/main/scala/kafka/Kafka.scala

@ -63,6 +63,12 @@ object Kafka extends Logging { @@ -63,6 +63,12 @@ object Kafka extends Logging {
props
}
// For Zk mode, the API forwarding is currently enabled only under migration flag. We can
// directly do a static IBP check to see API forwarding is enabled here because IBP check is
// static in Zk mode.
private def enableApiForwarding(config: KafkaConfig) =
config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
if (config.requiresZookeeper) {
@ -70,7 +76,7 @@ object Kafka extends Logging { @@ -70,7 +76,7 @@ object Kafka extends Logging {
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = false
enableForwarding = enableApiForwarding(config)
)
} else {
new KafkaRaftServer(

2
core/src/main/scala/kafka/common/InterBrokerSendThread.scala

@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._ @@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
*/
abstract class InterBrokerSendThread(
name: String,
networkClient: KafkaClient,
var networkClient: KafkaClient,
requestTimeoutMs: Int,
time: Time,
isInterruptible: Boolean = true

121
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala

@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque @@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.raft.RaftManager
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.{Node, Reconfigurable}
@ -37,42 +38,55 @@ import scala.collection.Seq @@ -37,42 +38,55 @@ import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait ControllerNodeProvider {
def get(): Option[Node]
def listenerName: ListenerName
def securityProtocol: SecurityProtocol
def saslMechanism: String
}
case class ControllerInformation(node: Option[Node],
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
saslMechanism: String,
isZkController: Boolean)
object MetadataCacheControllerNodeProvider {
def apply(
config: KafkaConfig,
metadataCache: kafka.server.MetadataCache
): MetadataCacheControllerNodeProvider = {
val listenerName = config.controlPlaneListenerName
.getOrElse(config.interBrokerListenerName)
val securityProtocol = config.controlPlaneSecurityProtocol
.getOrElse(config.interBrokerSecurityProtocol)
new MetadataCacheControllerNodeProvider(
metadataCache,
listenerName,
securityProtocol,
config.saslMechanismInterBrokerProtocol
)
}
trait ControllerNodeProvider {
def getControllerInfo(): ControllerInformation
}
class MetadataCacheControllerNodeProvider(
val metadataCache: kafka.server.MetadataCache,
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
val saslMechanism: String
val metadataCache: ZkMetadataCache,
val config: KafkaConfig
) extends ControllerNodeProvider {
override def get(): Option[Node] = {
metadataCache.getControllerId
.flatMap(metadataCache.getAliveBrokerNode(_, listenerName))
private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol
private val kraftControllerListenerName = if (config.controllerListenerNames.nonEmpty)
new ListenerName(config.controllerListenerNames.head) else null
private val kraftControllerSecurityProtocol = Option(kraftControllerListenerName)
.map( listener => config.effectiveListenerSecurityProtocolMap.getOrElse(
listener, SecurityProtocol.forName(kraftControllerListenerName.value())))
.orNull
private val kraftControllerSaslMechanism = config.saslMechanismControllerProtocol
private val emptyZkControllerInfo = ControllerInformation(
None,
zkControllerListenerName,
zkControllerSecurityProtocol,
zkControllerSaslMechanism,
isZkController = true)
override def getControllerInfo(): ControllerInformation = {
metadataCache.getControllerId.map {
case ZkCachedControllerId(id) => ControllerInformation(
metadataCache.getAliveBrokerNode(id, zkControllerListenerName),
zkControllerListenerName,
zkControllerSecurityProtocol,
zkControllerSaslMechanism,
isZkController = true)
case KRaftCachedControllerId(id) => ControllerInformation(
metadataCache.getAliveBrokerNode(id, kraftControllerListenerName),
kraftControllerListenerName,
kraftControllerSecurityProtocol,
kraftControllerSaslMechanism,
isZkController = false)
}.getOrElse(emptyZkControllerInfo)
}
}
@ -108,9 +122,9 @@ class RaftControllerNodeProvider( @@ -108,9 +122,9 @@ class RaftControllerNodeProvider(
) extends ControllerNodeProvider with Logging {
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
override def get(): Option[Node] = {
raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
}
override def getControllerInfo(): ControllerInformation =
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode),
listenerName, securityProtocol, saslMechanism, isZkController = false)
}
object BrokerToControllerChannelManager {
@ -176,13 +190,13 @@ class BrokerToControllerChannelManagerImpl( @@ -176,13 +190,13 @@ class BrokerToControllerChannelManagerImpl(
}
private[server] def newRequestThread = {
val networkClient = {
def networkClient(controllerInfo: ControllerInformation) = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerNodeProvider.securityProtocol,
controllerInfo.securityProtocol,
JaasContext.Type.SERVER,
config,
controllerNodeProvider.listenerName,
controllerNodeProvider.saslMechanism,
controllerInfo.listenerName,
controllerInfo.saslMechanism,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext
@ -276,17 +290,38 @@ case class BrokerToControllerQueueItem( @@ -276,17 +290,38 @@ case class BrokerToControllerQueueItem(
)
class BrokerToControllerRequestThread(
networkClient: KafkaClient,
networkClientFactory: ControllerInformation => KafkaClient,
metadataUpdater: ManualMetadataUpdater,
controllerNodeProvider: ControllerNodeProvider,
config: KafkaConfig,
time: Time,
threadName: String,
retryTimeoutMs: Long
) extends InterBrokerSendThread(threadName, networkClient, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) {
) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) {
var isZkController = false
private def maybeResetNetworkClient(controllerInformation: ControllerInformation,
initialize: Boolean = false): Unit = {
if (initialize || isZkController != controllerInformation.isZkController) {
if (!initialize) {
debug("Controller changed to " + (if (isZkController) "zk" else "kraft") + " mode. " +
"Resetting network client")
}
// Close existing network client.
if (networkClient != null) {
networkClient.initiateClose()
networkClient.close()
}
isZkController = controllerInformation.isZkController
updateControllerAddress(controllerInformation.node.orNull)
controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava))
networkClient = networkClientFactory(controllerInformation)
}
}
private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
private val activeController = new AtomicReference[Node](null)
maybeResetNetworkClient(controllerNodeProvider.getControllerInfo(), initialize = true)
// Used for testing
@volatile
@ -364,11 +399,13 @@ class BrokerToControllerRequestThread( @@ -364,11 +399,13 @@ class BrokerToControllerRequestThread(
}
override def doWork(): Unit = {
val controllerInformation = controllerNodeProvider.getControllerInfo()
maybeResetNetworkClient(controllerInformation)
if (activeControllerAddress().isDefined) {
super.pollOnce(Long.MaxValue)
} else {
debug("Controller isn't known, checking with controller provider")
controllerNodeProvider.get() match {
debug("Controller isn't cached, looking for local metadata changes")
controllerInformation.node match {
case Some(controllerNode) =>
info(s"Recorded new controller, from now on will use node $controllerNode")
updateControllerAddress(controllerNode)

18
core/src/main/scala/kafka/server/KafkaApis.scala

@ -79,7 +79,6 @@ import java.util @@ -79,7 +79,6 @@ import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Optional}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable}
@ -1317,6 +1316,12 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1317,6 +1316,12 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
val controllerId = {
metadataCache.getControllerId.flatMap {
case ZkCachedControllerId(id) => Some(id)
case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId
}
}
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
MetadataResponse.prepareResponse(
@ -1324,7 +1329,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1324,7 +1329,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestThrottleMs,
brokers.toList.asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
@ -3329,13 +3334,18 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -3329,13 +3334,18 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName)
val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
val controllerId = {
metadataCache.getControllerId.flatMap {
case ZkCachedControllerId(id) => Some(id)
case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId
}
}
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val data = new DescribeClusterResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setClusterId(clusterId)
.setControllerId(controllerId)
.setControllerId(controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID))
.setClusterAuthorizedOperations(clusterAuthorizedOperations)

22
core/src/main/scala/kafka/server/KafkaServer.scala

@ -147,6 +147,7 @@ class KafkaServer( @@ -147,6 +147,7 @@ class KafkaServer(
var kafkaScheduler: KafkaScheduler = _
var kraftControllerNodes: Seq[Node] = Seq.empty
@volatile var metadataCache: ZkMetadataCache = _
var quotaManagers: QuotaFactory.QuotaManagers = _
@ -272,8 +273,16 @@ class KafkaServer( @@ -272,8 +273,16 @@ class KafkaServer(
_brokerState = BrokerState.RECOVERY
logManager.startup(zkClient.getAllTopicsInCluster())
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures)
val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
if (config.migrationEnabled) {
kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
}
metadataCache = MetadataCache.zkMetadataCache(
config.brokerId,
config.interBrokerProtocolVersion,
brokerFeatures,
kraftControllerNodes)
val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config)
/* initialize feature change listener */
_featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)
@ -614,7 +623,14 @@ class KafkaServer( @@ -614,7 +623,14 @@ class KafkaServer(
private def controlledShutdown(): Unit = {
val socketTimeoutMs = config.controllerSocketTimeoutMs
// TODO (KAFKA-14447): Handle controlled shutdown for zkBroker when we have KRaft controller.
def doControlledShutdown(retries: Int): Boolean = {
if (config.requiresZookeeper &&
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
info("ZkBroker currently has a KRaft controller. Controlled shutdown will be handled " +
"through broker life cycle manager")
return false
}
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
@ -668,7 +684,7 @@ class KafkaServer( @@ -668,7 +684,7 @@ class KafkaServer(
// 1. Find the controller and establish a connection to it.
// If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
metadataCache.getControllerId match {
metadataCache.getControllerId.filter(_.isInstanceOf[ZkCachedControllerId]).map(_.id) match {
case Some(controllerId) =>
metadataCache.getAliveBrokerNode(controllerId, config.interBrokerListenerName) match {
case Some(broker) =>

18
core/src/main/scala/kafka/server/MetadataCache.scala

@ -32,6 +32,13 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) @@ -32,6 +32,13 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long)
}
}
sealed trait CachedControllerId {
val id: Int
}
case class ZkCachedControllerId(id: Int) extends CachedControllerId
case class KRaftCachedControllerId(id: Int) extends CachedControllerId
trait MetadataCache {
/**
@ -92,7 +99,7 @@ trait MetadataCache { @@ -92,7 +99,7 @@ trait MetadataCache {
def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
def getControllerId: Option[Int]
def getControllerId: Option[CachedControllerId]
def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
@ -103,13 +110,18 @@ trait MetadataCache { @@ -103,13 +110,18 @@ trait MetadataCache {
def metadataVersion(): MetadataVersion
def features(): FinalizedFeaturesAndEpoch
def getRandomAliveBrokerId: Option[Int]
}
object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()): ZkMetadataCache = {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
kraftControllerNodes: collection.Seq[Node] = null)
: ZkMetadataCache = {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
Option(kraftControllerNodes).getOrElse(collection.Seq.empty[Node]))
}
def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {

11
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.server.metadata
import kafka.controller.StateChangeLogger
import kafka.server.{FinalizedFeaturesAndEpoch, MetadataCache}
import kafka.server.{CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache}
import kafka.utils.Logging
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
@ -287,14 +287,19 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w @@ -287,14 +287,19 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
result.toMap
}
override def getControllerId: Option[Int] = getRandomAliveBroker(_currentImage)
/**
* Choose a random broker node to report as the controller. We do this because we want
* the client to send requests destined for the controller to a random broker.
* Clients do not have direct access to the controller in the KRaft world, as explained
* in KIP-590.
*/
override def getControllerId: Option[CachedControllerId] =
getRandomAliveBroker(_currentImage).map(KRaftCachedControllerId)
override def getRandomAliveBrokerId: Option[Int] = {
getRandomAliveBroker(_currentImage)
}
private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
if (aliveBrokers.isEmpty) {

58
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala

@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._ @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
import kafka.controller.StateChangeLogger
import kafka.server.{BrokerFeatures, FinalizedFeaturesAndEpoch, MetadataCache}
import kafka.server.{BrokerFeatures, CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import kafka.utils.Implicits._
@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse,
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.MetadataVersion
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.concurrent.TimeoutException
import scala.math.max
@ -60,7 +60,11 @@ trait ZkFinalizedFeatureCache { @@ -60,7 +60,11 @@ trait ZkFinalizedFeatureCache {
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
*/
class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures)
class ZkMetadataCache(
brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures,
kraftControllerNodes: Seq[Node] = Seq.empty)
extends MetadataCache with ZkFinalizedFeatureCache with Logging {
private val partitionMetadataLock = new ReentrantReadWriteLock()
@ -68,8 +72,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -68,8 +72,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
//replace the value with a completely new one. this means reads (which are not under any lock) need to grab
//the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
//multiple reads of this value risk getting different snapshots.
@volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
topicIds = Map.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)
@volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(
partitionStates = mutable.AnyRefMap.empty,
topicIds = Map.empty,
controllerId = None,
aliveBrokers = mutable.LongMap.empty,
aliveNodes = mutable.LongMap.empty)
this.logIdent = s"[MetadataCache brokerId=$brokerId] "
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
@ -79,6 +87,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -79,6 +87,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
private val featureLock = new ReentrantLock()
private val featureCond = featureLock.newCondition()
private val kraftControllerNodeMap = kraftControllerNodes.map(node => node.id() -> node).toMap
// This method is the main hotspot when it comes to the performance of metadata requests,
// we should be careful about adding additional logic here. Relatedly, `brokers` is
// `List[Integer]` instead of `List[Int]` to avoid a collection copy.
@ -248,7 +258,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -248,7 +258,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
}
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = {
metadataSnapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
val snapshot = metadataSnapshot
brokerId match {
case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) =>
kraftControllerNodeMap.get(id)
case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
}
}
override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] = {
@ -315,7 +330,14 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -315,7 +330,14 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
}.getOrElse(Map.empty[Int, Node])
}
def getControllerId: Option[Int] = metadataSnapshot.controllerId
def getControllerId: Option[CachedControllerId] = {
metadataSnapshot.controllerId
}
def getRandomAliveBrokerId: Option[Int] = {
val aliveBrokers = metadataSnapshot.aliveBrokers.values.toList
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
}
def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
val snapshot = metadataSnapshot
@ -329,6 +351,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -329,6 +351,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
nodes.getOrElse(id.toLong, new Node(id, "", -1))
}
def controllerId(snapshot: MetadataSnapshot): Option[Node] = {
snapshot.controllerId.flatMap {
case ZkCachedControllerId(id) => getAliveBrokerNode(id, listenerName)
case KRaftCachedControllerId(_) => getRandomAliveBrokerId.flatMap(getAliveBrokerNode(_, listenerName))
}
}
val partitions = getAllPartitions(snapshot)
.filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete }
.map { case (tp, state) =>
@ -342,7 +371,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -342,7 +371,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
new Cluster(clusterId, nodes.values.toBuffer.asJava,
partitions.toBuffer.asJava,
unauthorizedTopics, internalTopics,
snapshot.controllerId.map(id => node(id)).orNull)
controllerId(snapshot).orNull)
}
// This method returns the deleted TopicPartitions received from UpdateMetadataRequest
@ -351,9 +380,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -351,9 +380,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
val controllerIdOpt = updateMetadataRequest.controllerId match {
val controllerIdOpt: Option[CachedControllerId] = updateMetadataRequest.controllerId match {
case id if id < 0 => None
case id => Some(id)
case id =>
if (updateMetadataRequest.isKRaftController)
Some(KRaftCachedControllerId(id))
else
Some(ZkCachedControllerId(id))
}
updateMetadataRequest.liveBrokers.forEach { broker =>
@ -386,7 +419,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -386,7 +419,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes)
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, topicIds.toMap,
controllerIdOpt, aliveBrokers, aliveNodes)
} else {
//since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
@ -446,7 +480,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea @@ -446,7 +480,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea
case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
topicIds: Map[String, Uuid],
controllerId: Option[Int],
controllerId: Option[CachedControllerId],
aliveBrokers: mutable.LongMap[Broker],
aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) {
val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) }

4
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

@ -60,10 +60,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { @@ -60,10 +60,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
override def generateConfigs: Seq[KafkaConfig] = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
configureListeners(cfgs)
modifyConfigs(cfgs)
if (isZkMigrationTest()) {
cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true"))
}
insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}

2
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

@ -215,7 +215,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @@ -215,7 +215,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val brokerIds = brokers.map(_.config.brokerId).toSet
assertTrue(brokerIds.contains(controller.id))
} else {
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.map(_.id).
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id)
}

4
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala

@ -160,6 +160,10 @@ abstract class QuorumTestHarness extends Logging { @@ -160,6 +160,10 @@ abstract class QuorumTestHarness extends Logging {
TestInfoUtils.isKRaft(testInfo)
}
def isZkMigrationTest(): Boolean = {
TestInfoUtils.isZkMigrationTest(testInfo)
}
def checkIsZKTest(): Unit = {
if (isKRaftTest()) {
throw new RuntimeException("This function can't be accessed when running the test " +

52
core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala

@ -25,9 +25,10 @@ import kafka.utils.TestUtils.TestControllerRequestCompletionHandler @@ -25,9 +25,10 @@ import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient, NodeApiVersions}
import org.apache.kafka.common.Node
import org.apache.kafka.common.message.{EnvelopeResponseData, MetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, EnvelopeResponse, MetadataRequest, RequestTestUtils}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
import org.junit.jupiter.api.Assertions._
@ -37,6 +38,14 @@ import org.mockito.Mockito._ @@ -37,6 +38,14 @@ import org.mockito.Mockito._
class BrokerToControllerRequestThreadTest {
private def controllerInfo(node: Option[Node]): ControllerInformation = {
ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
}
private def emptyControllerInfo: ControllerInformation = {
controllerInfo(None)
}
@Test
def testRetryTimeoutWhileControllerNotAvailable(): Unit = {
val time = new MockTime()
@ -45,10 +54,10 @@ class BrokerToControllerRequestThreadTest { @@ -45,10 +54,10 @@ class BrokerToControllerRequestThreadTest {
val mockClient = new MockClient(time, metadata)
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
when(controllerNodeProvider.get()).thenReturn(None)
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
val retryTimeoutMs = 30000
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs)
testRequestThread.started = true
@ -82,10 +91,10 @@ class BrokerToControllerRequestThreadTest { @@ -82,10 +91,10 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
val activeController = new Node(controllerId, "host", 1234)
when(controllerNodeProvider.get()).thenReturn(Some(activeController))
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
mockClient.prepareResponse(expectedResponse)
@ -124,10 +133,11 @@ class BrokerToControllerRequestThreadTest { @@ -124,10 +133,11 @@ class BrokerToControllerRequestThreadTest {
val oldController = new Node(oldControllerId, "host1", 1234)
val newController = new Node(newControllerId, "host2", 1234)
when(controllerNodeProvider.get()).thenReturn(Some(oldController), Some(newController))
when(controllerNodeProvider.getControllerInfo()).thenReturn(
emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(),
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@ -171,13 +181,14 @@ class BrokerToControllerRequestThreadTest { @@ -171,13 +181,14 @@ class BrokerToControllerRequestThreadTest {
val oldController = new Node(oldControllerId, "host1", port)
val newController = new Node(newControllerId, "host2", port)
when(controllerNodeProvider.get()).thenReturn(Some(oldController), Some(newController))
when(controllerNodeProvider.getControllerInfo()).thenReturn(
emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
Collections.singletonMap("a", 2))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@ -231,7 +242,11 @@ class BrokerToControllerRequestThreadTest { @@ -231,7 +242,11 @@ class BrokerToControllerRequestThreadTest {
val oldController = new Node(oldControllerId, "host1", port)
val newController = new Node(newControllerId, "host2", port)
when(controllerNodeProvider.get()).thenReturn(Some(oldController), Some(newController))
when(controllerNodeProvider.getControllerInfo()).thenReturn(
emptyControllerInfo, // call to create network client.
controllerInfo(Some(oldController)),
controllerInfo(Some(newController))
)
// create an envelopeResponse with NOT_CONTROLLER error
val envelopeResponseWithNotControllerError = new EnvelopeResponse(
@ -240,7 +255,7 @@ class BrokerToControllerRequestThreadTest { @@ -240,7 +255,7 @@ class BrokerToControllerRequestThreadTest {
// response for retry request after receiving NOT_CONTROLLER error
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@ -296,13 +311,13 @@ class BrokerToControllerRequestThreadTest { @@ -296,13 +311,13 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
val controller = new Node(controllerId, "host1", 1234)
when(controllerNodeProvider.get()).thenReturn(Some(controller))
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo, controllerInfo(Some(controller)))
val retryTimeoutMs = 30000
val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2,
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
Collections.singletonMap("a", 2))
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs)
testRequestThread.started = true
@ -344,7 +359,7 @@ class BrokerToControllerRequestThreadTest { @@ -344,7 +359,7 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
val activeController = new Node(controllerId, "host", 1234)
when(controllerNodeProvider.get()).thenReturn(Some(activeController))
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
val callbackResponse = new AtomicReference[ClientResponse]()
val completionHandler = new ControllerRequestCompletionHandler {
@ -360,7 +375,7 @@ class BrokerToControllerRequestThreadTest { @@ -360,7 +375,7 @@ class BrokerToControllerRequestThreadTest {
mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@ -381,7 +396,7 @@ class BrokerToControllerRequestThreadTest { @@ -381,7 +396,7 @@ class BrokerToControllerRequestThreadTest {
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
val activeController = new Node(controllerId, "host", 1234)
when(controllerNodeProvider.get()).thenReturn(Some(activeController))
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
val callbackResponse = new AtomicReference[ClientResponse]()
val completionHandler = new ControllerRequestCompletionHandler {
@ -397,7 +412,7 @@ class BrokerToControllerRequestThreadTest { @@ -397,7 +412,7 @@ class BrokerToControllerRequestThreadTest {
mockClient.createPendingAuthenticationError(activeController, 50)
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
testRequestThread.started = true
@ -416,8 +431,9 @@ class BrokerToControllerRequestThreadTest { @@ -416,8 +431,9 @@ class BrokerToControllerRequestThreadTest {
val mockClient = new MockClient(time, metadata)
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
config, time, "", retryTimeoutMs = Long.MaxValue)
val completionHandler = new TestControllerRequestCompletionHandler(None)

8
core/src/test/scala/kafka/utils/TestInfoUtils.scala

@ -43,5 +43,13 @@ object TestInfoUtils { @@ -43,5 +43,13 @@ object TestInfoUtils {
false
}
}
def isZkMigrationTest(testInfo: TestInfo): Boolean = {
if (!isKRaft(testInfo)) {
false
} else {
testInfo.getDisplayName().contains("quorum=zkMigration")
}
}
final val TestWithParameterizedQuorumName = "{displayName}.quorum={0}"
}

3
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala

@ -357,7 +357,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { @@ -357,7 +357,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
config,
time = brokerTime(config.brokerId),
threadNamePrefix = None,
startup = false
startup = false,
enableZkApiForwarding = isZkMigrationTest()
)
}
}

6
core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala

@ -124,11 +124,11 @@ class ApiVersionManagerTest { @@ -124,11 +124,11 @@ class ApiVersionManagerTest {
features = brokerFeatures,
metadataCache = metadataCache
)
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
assertNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
}
}

9
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala

@ -57,13 +57,14 @@ class BrokerLifecycleManagerTest { @@ -57,13 +57,14 @@ class BrokerLifecycleManagerTest {
class SimpleControllerNodeProvider extends ControllerNodeProvider {
val node = new AtomicReference[Node](null)
override def get(): Option[Node] = Option(node.get())
def listenerName: ListenerName = new ListenerName("PLAINTEXT")
override def listenerName: ListenerName = new ListenerName("PLAINTEXT")
def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
override def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
override def getControllerInfo(): ControllerInformation = ControllerInformation(Option(node.get()),
listenerName, securityProtocol, saslMechanism, isZkController = false)
}
class BrokerLifecycleManagerTestContext(properties: Properties) {

15
core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package unit.kafka.server
import kafka.server.{BrokerToControllerChannelManager, ControllerNodeProvider, ControllerRequestCompletionHandler}
import kafka.server.{BrokerToControllerChannelManager, ControllerInformation, ControllerNodeProvider, ControllerRequestCompletionHandler}
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
@ -48,17 +48,22 @@ class BrokerRegistrationRequestTest { @@ -48,17 +48,22 @@ class BrokerRegistrationRequestTest {
def brokerToControllerChannelManager(clusterInstance: ClusterInstance): BrokerToControllerChannelManager = {
BrokerToControllerChannelManager(
new ControllerNodeProvider() {
override def get(): Option[Node] = Some(new Node(
def node: Option[Node] = Some(new Node(
clusterInstance.anyControllerSocketServer().config.nodeId,
"127.0.0.1",
clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName().get()),
))
override def listenerName: ListenerName = clusterInstance.controllerListenerName().get()
def listenerName: ListenerName = clusterInstance.controllerListenerName().get()
override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
val securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
override def saslMechanism: String = ""
val saslMechanism: String = ""
def isZkController: Boolean = !clusterInstance.isKRaftTest
override def getControllerInfo(): ControllerInformation =
ControllerInformation(node, listenerName, securityProtocol, saslMechanism, isZkController)
},
Time.SYSTEM,
new Metrics(),

10
core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala

@ -22,15 +22,16 @@ import org.apache.kafka.common.Uuid @@ -22,15 +22,16 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = {
@ -148,13 +149,14 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { @@ -148,13 +149,14 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk", "zkMigration"))
def testNotController(quorum: String): Unit = {
// Note: we don't run this test when in KRaft mode, because KRaft doesn't have this
// behavior of returning NOT_CONTROLLER. Instead, the request is forwarded.
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
val error = if (isZkMigrationTest()) Errors.NONE else Errors.NOT_CONTROLLER
assertEquals(1, response.errorCounts().get(error))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)

5
core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala

@ -221,7 +221,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { @@ -221,7 +221,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
* Instead, the request is forwarded.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk", "zkMigration"))
def testNotController(quorum: String): Unit = {
val request = new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
@ -229,8 +229,9 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { @@ -229,8 +229,9 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
.setTimeoutMs(1000)).build()
val response = sendDeleteTopicsRequest(request, notControllerSocketServer)
val expectedError = if (isZkMigrationTest()) Errors.NONE else Errors.NOT_CONTROLLER
val error = response.data.responses.find("not-controller").errorCode()
assertEquals(Errors.NOT_CONTROLLER.code, error, "Expected controller error when routed incorrectly")
assertEquals(expectedError.code(), error)
}
private def validateTopicIsDeleted(topic: String): Unit = {

24
core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala

@ -60,6 +60,14 @@ class ForwardingManagerTest { @@ -60,6 +60,14 @@ class ForwardingManagerTest {
NodeApiVersions.create(List(envelopeApiVersion).asJava)
}
private def controllerInfo = {
ControllerInformation(Some(new Node(0, "host", 1234)), new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
}
private def emptyControllerInfo = {
ControllerInformation(None, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
}
@Test
def testResponseCorrelationIdMismatch(): Unit = {
val requestCorrelationId = 27
@ -71,7 +79,7 @@ class ForwardingManagerTest { @@ -71,7 +79,7 @@ class ForwardingManagerTest {
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion,
requestCorrelationId + 1)
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE));
@ -95,7 +103,7 @@ class ForwardingManagerTest { @@ -95,7 +103,7 @@ class ForwardingManagerTest {
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
@ -112,7 +120,7 @@ class ForwardingManagerTest { @@ -112,7 +120,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
Mockito.when(controllerNodeProvider.get()).thenReturn(None)
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@ -136,7 +144,7 @@ class ForwardingManagerTest { @@ -136,7 +144,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234)))
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@ -162,8 +170,7 @@ class ForwardingManagerTest { @@ -162,8 +170,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val controllerNode = new Node(0, "host", 1234)
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode))
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
client.prepareUnsupportedVersionResponse(req => req.apiKey == requestHeader.apiKey)
@ -183,10 +190,9 @@ class ForwardingManagerTest { @@ -183,10 +190,9 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val controllerNode = new Node(0, "host", 1234)
Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode))
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
client.createPendingAuthenticationError(controllerNode, 50)
client.createPendingAuthenticationError(controllerInfo.node.get, 50)
val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))

2
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -3946,7 +3946,7 @@ class KafkaApisTest { @@ -3946,7 +3946,7 @@ class KafkaApisTest {
val capturedResponse = verifyNoThrottling(request)
val describeClusterResponse = capturedResponse.getValue.asInstanceOf[DescribeClusterResponse]
assertEquals(metadataCache.getControllerId.get, describeClusterResponse.data.controllerId)
assertEquals(metadataCache.getControllerId.get.id, describeClusterResponse.data.controllerId)
assertEquals(clusterId, describeClusterResponse.data.clusterId)
assertEquals(8096, describeClusterResponse.data.clusterAuthorizedOperations)
assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet,

2
core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala

@ -73,7 +73,7 @@ class MockBrokerToControllerChannelManager( @@ -73,7 +73,7 @@ class MockBrokerToControllerChannelManager(
queueItem.callback.onTimeout()
unsentIterator.remove()
} else {
controllerNodeProvider.get() match {
controllerNodeProvider.getControllerInfo().node match {
case Some(controller) if client.ready(controller, time.milliseconds()) =>
val clientRequest = client.newClientRequest(
controller.idString,

8
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -159,7 +159,8 @@ class RequestQuotaTest extends BaseRequestTest { @@ -159,7 +159,8 @@ class RequestQuotaTest extends BaseRequestTest {
@Test
def testExemptRequestTime(): Unit = {
for (apiKey <- RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle) {
val actions = RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle -- RequestQuotaTest.Envelope
for (apiKey <- actions) {
submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
}
@ -170,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -170,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest {
def testUnauthorizedThrottle(): Unit = {
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
for (apiKey <- ApiKeys.zkBrokerApis.asScala) {
for (apiKey <- ApiKeys.zkBrokerApis.asScala.toSet -- RequestQuotaTest.Envelope) {
submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
}
@ -765,7 +766,8 @@ object RequestQuotaTest { @@ -765,7 +766,8 @@ object RequestQuotaTest {
val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES)
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions
val Envelope = Set(ApiKeys.ENVELOPE)
val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions -- Envelope
val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
// Principal used for all client connections. This is modified by tests which

7
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -179,7 +179,12 @@ object TestUtils extends Logging { @@ -179,7 +179,12 @@ object TestUtils extends Logging {
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = {
val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = false)
createServer(config, time, threadNamePrefix, startup, enableZkApiForwarding = false)
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String],
startup: Boolean, enableZkApiForwarding: Boolean) = {
val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = enableZkApiForwarding)
if (startup) server.startup()
server
}

3
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

@ -218,7 +218,8 @@ public class ReplicaFetcherThreadBenchmark { @@ -218,7 +218,8 @@ public class ReplicaFetcherThreadBenchmark {
0, 0, 0, updatePartitionState, Collections.emptyList(), topicIds).build();
// TODO: fix to support raft
ZkMetadataCache metadataCache = new ZkMetadataCache(0, config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), null);
metadataCache.updateMetadata(0, updateMetadataRequest);
replicaManager = new ReplicaManagerBuilder().

3
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java

@ -110,7 +110,8 @@ public class MetadataRequestBenchmark { @@ -110,7 +110,8 @@ public class MetadataRequestBenchmark {
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest(), BrokerFeatures.createEmpty());
private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId,
MetadataVersion.latest(), BrokerFeatures.createEmpty(), null);
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);

4
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java

@ -112,7 +112,9 @@ public class CheckpointBench { @@ -112,7 +112,9 @@ public class CheckpointBench {
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),
BrokerFeatures.createEmpty(), null);
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,

6
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java

@ -26,6 +26,7 @@ import kafka.server.BrokerFeatures; @@ -26,6 +26,7 @@ import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder;
@ -33,7 +34,6 @@ import kafka.server.builders.ReplicaManagerBuilder; @@ -33,7 +34,6 @@ import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.ConfigRepository;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils;
@ -160,7 +160,9 @@ public class PartitionCreationBench { @@ -160,7 +160,9 @@ public class PartitionCreationBench {
setLogManager(logManager).
setQuotaManagers(quotaManagers).
setBrokerTopicStats(brokerTopicStats).
setMetadataCache(new ZkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty())).
setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(),
null)).
setLogDirFailureChannel(failureChannel).
setAlterPartitionManager(alterPartitionManager).
build();

4
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -243,6 +243,10 @@ public enum MetadataVersion { @@ -243,6 +243,10 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_3_IV1);
}
public boolean isApiForwardingEnabled() {
return this.isAtLeast(IBP_3_4_IV0);
}
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}

Loading…
Cancel
Save