From 8118b6c9f9c3a8cc77dca2f6a516254f03a43d5e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 13 Oct 2020 17:53:47 -0700 Subject: [PATCH] KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353) Before `AlterIsr` which was introduced in KIP-497, the controller would register watches in Zookeeper for each reassigning partition so that it could be notified immediately when the ISR was expanded and the reassignment could be completed. This notification is not needed with the latest IBP when `AlterIsr` is enabled because the controller will execute all ISR changes itself. There is one subtle detail. If we are in the middle of a roll in order to bump the IBP, then it is possible for the controller to be on the latest IBP while some of the brokers are still on the older one. In this case, the brokers on the older IBP will not send `AlterIsr`, but we can still rely on the delayed notification through the `isr_notifications` path to complete reassignments. This seems like a reasonable tradeoff since it should be a short window before the roll is completed. Reviewers: David Jacot , Jun Rao --- .../kafka/controller/KafkaController.scala | 65 ++++++++++++---- .../scala/kafka/server/ReplicaManager.scala | 36 ++++++--- .../ReassignPartitionsIntegrationTest.scala | 78 +++++++++++++++---- 3 files changed, 139 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 47b52c6b19b..cc660600382 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -84,6 +84,7 @@ class KafkaController(val config: KafkaConfig, @volatile private var brokerInfo = initialBrokerInfo @volatile private var _brokerEpoch = initialBrokerEpoch + private val isAlterIsrEnabled = config.interBrokerProtocolVersion >= KAFKA_2_7_IV2 private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None) val controllerContext = new ControllerContext var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics, @@ -789,8 +790,10 @@ class KafkaController(val config: KafkaConfig, stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) } - val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) - zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) + if (!isAlterIsrEnabled) { + val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) + zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) + } controllerContext.partitionsBeingReassigned.add(topicPartition) } @@ -1089,17 +1092,21 @@ class KafkaController(val config: KafkaConfig, } private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = { - controllerContext.partitionsBeingReassigned.foreach { tp => - val path = TopicPartitionStateZNode.path(tp) - zkClient.unregisterZNodeChangeHandler(path) + if (!isAlterIsrEnabled) { + controllerContext.partitionsBeingReassigned.foreach { tp => + val path = TopicPartitionStateZNode.path(tp) + zkClient.unregisterZNodeChangeHandler(path) + } } } private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = { if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val path = TopicPartitionStateZNode.path(topicPartition) - zkClient.unregisterZNodeChangeHandler(path) + if (!isAlterIsrEnabled) { + val path = TopicPartitionStateZNode.path(topicPartition) + zkClient.unregisterZNodeChangeHandler(path) + } maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas) controllerContext.partitionsBeingReassigned.remove(topicPartition) } else { @@ -1830,13 +1837,17 @@ class KafkaController(val config: KafkaConfig, if (!isActive) return if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - if (isReassignmentComplete(topicPartition, reassignment)) { - // resume the partition reassignment process - info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " + - s"reassigning partition $topicPartition") - onPartitionReassignment(topicPartition, reassignment) - } + maybeCompleteReassignment(topicPartition) + } + } + + private def maybeCompleteReassignment(topicPartition: TopicPartition): Unit = { + val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + if (isReassignmentComplete(topicPartition, reassignment)) { + // resume the partition reassignment process + info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " + + s"reassigning partition $topicPartition") + onPartitionReassignment(topicPartition, reassignment) } } @@ -2073,6 +2084,16 @@ class KafkaController(val config: KafkaConfig, if (partitions.nonEmpty) { updateLeaderAndIsrCache(partitions) processUpdateNotifications(partitions) + + // During a partial upgrade, the controller may be on an IBP which assumes + // ISR changes through the `AlterIsr` API while some brokers are on an older + // IBP which assumes notification through Zookeeper. In this case, since the + // controller will not have registered watches for reassigning partitions, we + // can still rely on the batch ISR change notification path in order to + // complete the reassignment. + partitions.filter(controllerContext.partitionsBeingReassigned.contains).foreach { topicPartition => + maybeCompleteReassignment(topicPartition) + } } } finally { // delete the notifications @@ -2227,7 +2248,8 @@ class KafkaController(val config: KafkaConfig, eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback)) } - private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], + private def processAlterIsr(brokerId: Int, brokerEpoch: Long, + isrsToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterIsrCallback): Unit = { // Handle a few short-circuits @@ -2315,6 +2337,19 @@ class KafkaController(val config: KafkaConfig, } callback.apply(response) + + // After we have returned the result of the `AlterIsr` request, we should check whether + // there are any reassignments which can be completed by a successful ISR expansion. + response.left.foreach { alterIsrResponses => + alterIsrResponses.forKeyValue { (topicPartition, partitionResponse) => + if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { + val isSuccessfulUpdate = partitionResponse.isRight + if (isSuccessfulUpdate) { + maybeCompleteReassignment(topicPartition) + } + } + } + } } private def processControllerChange(): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c080d6af4e6..9c7307b2f0f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -164,10 +164,26 @@ object HostedPartition { final object Offline extends HostedPartition } +case class IsrChangePropagationConfig( + // How often to check for ISR + checkIntervalMs: Long, + + // Maximum time that an ISR change may be delayed before sending the notification + maxDelayMs: Long, + + // Maximum time to await additional changes before sending the notification + lingerMs: Long +) + object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" - val IsrChangePropagationBackoff = 5000L - val IsrChangePropagationInterval = 60000L + + // This field is mutable to allow overriding change notification behavior in test cases + @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig( + checkIntervalMs = 2500, + lingerMs = 5000, + maxDelayMs = 60000, + ) } class ReplicaManager(val config: KafkaConfig, @@ -233,9 +249,10 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = s"[ReplicaManager broker=$localBrokerId] " private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None) + private val isrChangeNotificationConfig = ReplicaManager.DefaultIsrPropagationConfig private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() - private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) - private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) + private val lastIsrChangeMs = new AtomicLong(time.milliseconds()) + private val lastIsrPropagationMs = new AtomicLong(time.milliseconds()) private var logDirFailureHandler: LogDirFailureHandler = null @@ -278,7 +295,7 @@ class ReplicaManager(val config: KafkaConfig, def recordIsrChange(topicPartition: TopicPartition): Unit = { isrChangeSet synchronized { isrChangeSet += topicPartition - lastIsrChangeMs.set(System.currentTimeMillis()) + lastIsrChangeMs.set(time.milliseconds()) } } /** @@ -289,11 +306,11 @@ class ReplicaManager(val config: KafkaConfig, * other brokers when large amount of ISR change occurs. */ def maybePropagateIsrChanges(): Unit = { - val now = System.currentTimeMillis() + val now = time.milliseconds() isrChangeSet synchronized { if (isrChangeSet.nonEmpty && - (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBackoff < now || - lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) { + (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now || + lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) { zkClient.propagateIsrChanges(isrChangeSet) isrChangeSet.clear() lastIsrPropagationMs.set(now) @@ -324,7 +341,8 @@ class ReplicaManager(val config: KafkaConfig, scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) // If using AlterIsr, we don't need the znode ISR propagation if (config.interBrokerProtocolVersion < KAFKA_2_7_IV2) { - scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) + scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, + period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS) } else { alterIsrManager.start() } diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index aeb5ac7cd04..6d5d02d3cac 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -21,23 +21,24 @@ import java.io.Closeable import java.util.{Collections, HashMap, List} import kafka.admin.ReassignPartitionsCommand._ -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.api.KAFKA_2_7_IV1 +import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ReplicaManager} +import kafka.utils.Implicits._ import kafka.utils.TestUtils import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic} import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils -import org.junit.rules.Timeout +import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica} import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.rules.Timeout import org.junit.{After, Rule, Test} -import scala.collection.Map +import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ -import scala.collection.{Seq, mutable} class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @Rule @@ -45,10 +46,6 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { var cluster: ReassignPartitionsTestCluster = null - def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(5, zkConnect).map(KafkaConfig.fromProps) - } - @After override def tearDown(): Unit = { Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster") @@ -60,13 +57,53 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap }.toMap - /** - * Test running a quick reassignment. - */ + @Test def testReassignment(): Unit = { cluster = new ReassignPartitionsTestCluster(zkConnect) cluster.setup() + executeAndVerifyReassignment() + } + + @Test + def testReassignmentWithAlterIsrDisabled(): Unit = { + // Test reassignment when the IBP is on an older version which does not use + // the `AlterIsr` API. In this case, the controller will register individual + // watches for each reassigning partition so that the reassignment can be + // completed as soon as the ISR is expanded. + val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version) + cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides) + cluster.setup() + executeAndVerifyReassignment() + } + + @Test + def testReassignmentCompletionDuringPartialUpgrade(): Unit = { + // Test reassignment during a partial upgrade when some brokers are relying on + // `AlterIsr` and some rely on the old notification logic through Zookeeper. + // In this test case, broker 0 starts up first on the latest IBP and is typically + // elected as controller. The three remaining brokers start up on the older IBP. + // We want to ensure that reassignment can still complete through the ISR change + // notification path even though the controller expects `AlterIsr`. + + // Override change notification settings so that test is not delayed by ISR + // change notification delay + ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig( + checkIntervalMs = 500, + lingerMs = 100, + maxDelayMs = 500 + ) + + val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version) + val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig) + + cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides) + cluster.setup() + + executeAndVerifyReassignment() + } + + def executeAndVerifyReassignment(): Unit = { val assignment = """{"version":1,"partitions":""" + """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" + """{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" + @@ -594,7 +631,11 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { } } - class ReassignPartitionsTestCluster(val zkConnect: String) extends Closeable { + class ReassignPartitionsTestCluster( + val zkConnect: String, + configOverrides: Map[String, String] = Map.empty, + brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty + ) extends Closeable { val brokers = Map( 0 -> "rack0", 1 -> "rack0", @@ -622,6 +663,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { // Don't move partition leaders automatically. config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000") + configOverrides.forKeyValue(config.setProperty) + + brokerConfigOverrides.get(brokerId).foreach { overrides => + overrides.forKeyValue(config.setProperty) + } + config }.toBuffer @@ -637,9 +684,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { } def createServers(): Unit = { - brokers.keySet.foreach { - case brokerId => - servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId))) + brokers.keySet.foreach { brokerId => + servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId))) } }