Browse Source

KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869)

This PR avoids sending out full UpdateMetadataReuqest in the following scenarios:

1. On broker startup, send out full UpdateMetadataRequest to newly added brokers and only send out UpdateMetadataReuqest with empty partition states to existing brokers.
2. On broker failure, if it doesn't require leader election, only include the states of partitions that are hosted by the dead broker(s) in the UpdateMetadataReuqest instead of including all partition states.

This PR also introduces a minor optimization in the MetadataCache update to avoid copying the previous partition states upon receiving UpdateMetadataRequest with no partition states.

Reviewers: Jun Rao <junrao@gmail.com>
pull/5889/head
Zhanxiang (Patrick) Huang 6 years ago committed by Jun Rao
parent
commit
7b5ffa0a07
  1. 7
      core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  2. 21
      core/src/main/scala/kafka/controller/KafkaController.scala
  3. 44
      core/src/main/scala/kafka/server/MetadataCache.scala
  4. 53
      core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala

7
core/src/main/scala/kafka/controller/ControllerChannelManager.scala

@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge @@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
}
}
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
givenPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
}

21
core/src/main/scala/kafka/controller/KafkaController.scala

@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
info("Sending update metadata request")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
replicaStateMachine.startup()
partitionStateMachine.startup()
@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
info(s"New broker startup callback for ${newBrokers.mkString(",")}")
newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val newBrokersSet = newBrokers.toSet
// send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
// broker via this update.
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
// Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers
// via this update. No need to include any partition states in the request since there are no partition state changes.
sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
// Send update metadata request to all the new brokers in the cluster with a full set of partition states for initialization.
// In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
// common controlled shutdown case, the metadata will reach the new brokers faster
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
// common controlled shutdown case, the metadata will reach the new brokers faster.
sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet)
// the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def onBrokerUpdate(updatedBrokerId: Int) {
info(s"Broker info update callback for $updatedBrokerId")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}
/**
@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
}
// If replica failure did not require leader re-election, inform brokers of the offline replica
// If replica failure did not require leader re-election, inform brokers of the offline brokers
// Note that during leader re-election, brokers update their metadata
if (partitionsWithoutLeader.isEmpty) {
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}
}
@ -887,7 +890,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -887,7 +890,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
*
* @param brokers The brokers that the update metadata request should be sent to
*/
private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) {
private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]) {
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)

44
core/src/main/scala/kafka/server/MetadataCache.scala

@ -214,13 +214,6 @@ class MetadataCache(brokerId: Int) extends Logging { @@ -214,13 +214,6 @@ class MetadataCache(brokerId: Int) extends Logging {
def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
//since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
copy ++= oldPartitionStates
partitionStates += (topic -> copy)
}
val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
val controllerId = updateMetadataRequest.controllerId match {
@ -248,21 +241,32 @@ class MetadataCache(brokerId: Int) extends Logging { @@ -248,21 +241,32 @@ class MetadataCache(brokerId: Int) extends Logging {
}
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(partitionStates, tp.topic, tp.partition)
stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
deletedPartitions += tp
} else {
addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
if (updateMetadataRequest.partitionStates().isEmpty) {
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes)
} else {
//since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
copy ++= oldPartitionStates
partitionStates += (topic -> copy)
}
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(partitionStates, tp.topic, tp.partition)
stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
deletedPartitions += tp
} else {
addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
}
}
metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
}
metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
deletedPartitions
}
}

53
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala

@ -83,6 +83,59 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { @@ -83,6 +83,59 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move")
}
// This test case is used to ensure that there will be no correctness issue after we avoid sending out full
// UpdateMetadataRequest to all brokers in the cluster
@Test
def testMetadataPropagationOnBrokerChange(): Unit = {
servers = makeServers(3)
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
// Need to make sure the broker we shutdown and startup are not the controller. Otherwise we will send out
// full UpdateMetadataReuqest to all brokers during controller failover.
val testBroker = servers.filter(e => e.config.brokerId != controllerId).head
val remainingBrokers = servers.filter(_.config.brokerId != testBroker.config.brokerId)
val topic = "topic1"
// Make sure shutdown the test broker will not require any leadership change to test avoid sending out full
// UpdateMetadataRequest on broker failure
val assignment = Map(
0 -> Seq(remainingBrokers(0).config.brokerId, testBroker.config.brokerId),
1 -> remainingBrokers.map(_.config.brokerId))
// Create topic
TestUtils.createTopic(zkClient, topic, assignment, servers)
// Shutdown the broker
testBroker.shutdown()
testBroker.awaitShutdown()
TestUtils.waitUntilBrokerMetadataIsPropagated(remainingBrokers)
remainingBrokers.foreach { server =>
val offlineReplicaPartitionInfo = server.metadataCache.getPartitionInfo(topic, 0).get
assertEquals(1, offlineReplicaPartitionInfo.offlineReplicas.size())
assertEquals(testBroker.config.brokerId, offlineReplicaPartitionInfo.offlineReplicas.get(0))
assertEquals(assignment(0).asJava, offlineReplicaPartitionInfo.basePartitionState.replicas)
assertEquals(Seq(remainingBrokers.head.config.brokerId).asJava, offlineReplicaPartitionInfo.basePartitionState.isr)
val onlinePartitionInfo = server.metadataCache.getPartitionInfo(topic, 1).get
assertEquals(assignment(1).asJava, onlinePartitionInfo.basePartitionState.replicas)
assertTrue(onlinePartitionInfo.offlineReplicas.isEmpty)
}
// Startup the broker
testBroker.startup()
TestUtils.waitUntilTrue( () => {
!servers.exists { server =>
assignment.exists { case (partitionId, replicas) =>
val partitionInfoOpt = server.metadataCache.getPartitionInfo(topic, partitionId)
if (partitionInfoOpt.isDefined) {
val partitionInfo = partitionInfoOpt.get
!partitionInfo.offlineReplicas.isEmpty || !partitionInfo.basePartitionState.replicas.asScala.equals(replicas)
} else {
true
}
}
}
}, "Inconsistent metadata after broker startup")
}
@Test
def testTopicCreation(): Unit = {
servers = makeServers(1)

Loading…
Cancel
Save