From 7b5ffa0a070065e5e8320f481bbd8a3a26378f91 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Tue, 6 Nov 2018 15:28:53 -0800 Subject: [PATCH] KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869) This PR avoids sending out full UpdateMetadataReuqest in the following scenarios: 1. On broker startup, send out full UpdateMetadataRequest to newly added brokers and only send out UpdateMetadataReuqest with empty partition states to existing brokers. 2. On broker failure, if it doesn't require leader election, only include the states of partitions that are hosted by the dead broker(s) in the UpdateMetadataReuqest instead of including all partition states. This PR also introduces a minor optimization in the MetadataCache update to avoid copying the previous partition states upon receiving UpdateMetadataRequest with no partition states. Reviewers: Jun Rao --- .../controller/ControllerChannelManager.scala | 7 +-- .../kafka/controller/KafkaController.scala | 21 ++++---- .../scala/kafka/server/MetadataCache.scala | 44 ++++++++------- .../ControllerIntegrationTest.scala | 53 +++++++++++++++++++ 4 files changed, 90 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 85da8b8c0b2..a11f5535bda 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge } } - val givenPartitions = if (partitions.isEmpty) - controllerContext.partitionLeadershipInfo.keySet - else - partitions - updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0) - givenPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, + partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic))) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 740ab7ff78c..a52f3f02363 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). info("Sending update metadata request") - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) replicaStateMachine.startup() partitionStateMachine.startup() @@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti info(s"New broker startup callback for ${newBrokers.mkString(",")}") newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val newBrokersSet = newBrokers.toSet - // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new - // broker via this update. + val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers + // Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers + // via this update. No need to include any partition states in the request since there are no partition state changes. + sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty) + // Send update metadata request to all the new brokers in the cluster with a full set of partition states for initialization. // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the - // common controlled shutdown case, the metadata will reach the new brokers faster - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + // common controlled shutdown case, the metadata will reach the new brokers faster. + sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) @@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private def onBrokerUpdate(updatedBrokerId: Int) { info(s"Broker info update callback for $updatedBrokerId") - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) } /** @@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } - // If replica failure did not require leader re-election, inform brokers of the offline replica + // If replica failure did not require leader re-election, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata if (partitionsWithoutLeader.isEmpty) { - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) } } @@ -887,7 +890,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti * * @param brokers The brokers that the update metadata request should be sent to */ - private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) { + private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]) { try { brokerRequestBatch.newBatch() brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 31146636ae0..3fefc7b84ef 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -214,13 +214,6 @@ class MetadataCache(brokerId: Int) extends Logging { def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { inWriteLock(partitionMetadataLock) { - //since kafka may do partial metadata updates, we start by copying the previous state - val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size) - metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => - val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size) - copy ++= oldPartitionStates - partitionStates += (topic -> copy) - } val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) val controllerId = updateMetadataRequest.controllerId match { @@ -248,21 +241,32 @@ class MetadataCache(brokerId: Int) extends Logging { } val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] - updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => - val controllerId = updateMetadataRequest.controllerId - val controllerEpoch = updateMetadataRequest.controllerEpoch - if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) { - removePartitionInfo(partitionStates, tp.topic, tp.partition) - stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + - s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") - deletedPartitions += tp - } else { - addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) - stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + - s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") + if (updateMetadataRequest.partitionStates().isEmpty) { + metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes) + } else { + //since kafka may do partial metadata updates, we start by copying the previous state + val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size) + metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => + val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size) + copy ++= oldPartitionStates + partitionStates += (topic -> copy) + } + updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => + val controllerId = updateMetadataRequest.controllerId + val controllerEpoch = updateMetadataRequest.controllerEpoch + if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(partitionStates, tp.topic, tp.partition) + stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + + s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") + deletedPartitions += tp + } else { + addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) + stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + + s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") + } } + metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) } - metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) deletedPartitions } } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index e2f50b9d603..c0eda3dffef 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -83,6 +83,59 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move") } + // This test case is used to ensure that there will be no correctness issue after we avoid sending out full + // UpdateMetadataRequest to all brokers in the cluster + @Test + def testMetadataPropagationOnBrokerChange(): Unit = { + servers = makeServers(3) + TestUtils.waitUntilBrokerMetadataIsPropagated(servers) + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + // Need to make sure the broker we shutdown and startup are not the controller. Otherwise we will send out + // full UpdateMetadataReuqest to all brokers during controller failover. + val testBroker = servers.filter(e => e.config.brokerId != controllerId).head + val remainingBrokers = servers.filter(_.config.brokerId != testBroker.config.brokerId) + val topic = "topic1" + // Make sure shutdown the test broker will not require any leadership change to test avoid sending out full + // UpdateMetadataRequest on broker failure + val assignment = Map( + 0 -> Seq(remainingBrokers(0).config.brokerId, testBroker.config.brokerId), + 1 -> remainingBrokers.map(_.config.brokerId)) + + // Create topic + TestUtils.createTopic(zkClient, topic, assignment, servers) + + // Shutdown the broker + testBroker.shutdown() + testBroker.awaitShutdown() + TestUtils.waitUntilBrokerMetadataIsPropagated(remainingBrokers) + remainingBrokers.foreach { server => + val offlineReplicaPartitionInfo = server.metadataCache.getPartitionInfo(topic, 0).get + assertEquals(1, offlineReplicaPartitionInfo.offlineReplicas.size()) + assertEquals(testBroker.config.brokerId, offlineReplicaPartitionInfo.offlineReplicas.get(0)) + assertEquals(assignment(0).asJava, offlineReplicaPartitionInfo.basePartitionState.replicas) + assertEquals(Seq(remainingBrokers.head.config.brokerId).asJava, offlineReplicaPartitionInfo.basePartitionState.isr) + val onlinePartitionInfo = server.metadataCache.getPartitionInfo(topic, 1).get + assertEquals(assignment(1).asJava, onlinePartitionInfo.basePartitionState.replicas) + assertTrue(onlinePartitionInfo.offlineReplicas.isEmpty) + } + + // Startup the broker + testBroker.startup() + TestUtils.waitUntilTrue( () => { + !servers.exists { server => + assignment.exists { case (partitionId, replicas) => + val partitionInfoOpt = server.metadataCache.getPartitionInfo(topic, partitionId) + if (partitionInfoOpt.isDefined) { + val partitionInfo = partitionInfoOpt.get + !partitionInfo.offlineReplicas.isEmpty || !partitionInfo.basePartitionState.replicas.asScala.equals(replicas) + } else { + true + } + } + } + }, "Inconsistent metadata after broker startup") + } + @Test def testTopicCreation(): Unit = { servers = makeServers(1)