Browse Source

KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)

Before `AlterIsr` which was introduced in KIP-497, the controller would register watches in Zookeeper for each reassigning partition so that it could be notified immediately when the ISR was expanded and the reassignment could be completed. This notification is not needed with the latest IBP when `AlterIsr` is enabled because the controller will execute all ISR changes itself.

There is one subtle detail. If we are in the middle of a roll in order to bump the IBP, then it is possible for the controller to be on the latest IBP while some of the brokers are still on the older one. In this case, the brokers on the older IBP will not send `AlterIsr`, but we can still rely on the delayed notification through the `isr_notifications` path to complete reassignments. This seems like a reasonable tradeoff since it should be a short window before the roll is completed.

Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
pull/9437/head
Jason Gustafson 4 years ago committed by GitHub
parent
commit
8118b6c9f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 36
      core/src/main/scala/kafka/server/ReplicaManager.scala
  3. 78
      core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala

65
core/src/main/scala/kafka/controller/KafkaController.scala

@ -84,6 +84,7 @@ class KafkaController(val config: KafkaConfig, @@ -84,6 +84,7 @@ class KafkaController(val config: KafkaConfig,
@volatile private var brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch
private val isAlterIsrEnabled = config.interBrokerProtocolVersion >= KAFKA_2_7_IV2
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
@ -789,8 +790,10 @@ class KafkaController(val config: KafkaConfig, @@ -789,8 +790,10 @@ class KafkaController(val config: KafkaConfig,
stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas)
}
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
if (!isAlterIsrEnabled) {
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
}
controllerContext.partitionsBeingReassigned.add(topicPartition)
}
@ -1089,17 +1092,21 @@ class KafkaController(val config: KafkaConfig, @@ -1089,17 +1092,21 @@ class KafkaController(val config: KafkaConfig,
}
private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
controllerContext.partitionsBeingReassigned.foreach { tp =>
val path = TopicPartitionStateZNode.path(tp)
zkClient.unregisterZNodeChangeHandler(path)
if (!isAlterIsrEnabled) {
controllerContext.partitionsBeingReassigned.foreach { tp =>
val path = TopicPartitionStateZNode.path(tp)
zkClient.unregisterZNodeChangeHandler(path)
}
}
}
private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition,
assignment: ReplicaAssignment): Unit = {
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
val path = TopicPartitionStateZNode.path(topicPartition)
zkClient.unregisterZNodeChangeHandler(path)
if (!isAlterIsrEnabled) {
val path = TopicPartitionStateZNode.path(topicPartition)
zkClient.unregisterZNodeChangeHandler(path)
}
maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas)
controllerContext.partitionsBeingReassigned.remove(topicPartition)
} else {
@ -1830,13 +1837,17 @@ class KafkaController(val config: KafkaConfig, @@ -1830,13 +1837,17 @@ class KafkaController(val config: KafkaConfig,
if (!isActive) return
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
if (isReassignmentComplete(topicPartition, reassignment)) {
// resume the partition reassignment process
info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " +
s"reassigning partition $topicPartition")
onPartitionReassignment(topicPartition, reassignment)
}
maybeCompleteReassignment(topicPartition)
}
}
private def maybeCompleteReassignment(topicPartition: TopicPartition): Unit = {
val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
if (isReassignmentComplete(topicPartition, reassignment)) {
// resume the partition reassignment process
info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " +
s"reassigning partition $topicPartition")
onPartitionReassignment(topicPartition, reassignment)
}
}
@ -2073,6 +2084,16 @@ class KafkaController(val config: KafkaConfig, @@ -2073,6 +2084,16 @@ class KafkaController(val config: KafkaConfig,
if (partitions.nonEmpty) {
updateLeaderAndIsrCache(partitions)
processUpdateNotifications(partitions)
// During a partial upgrade, the controller may be on an IBP which assumes
// ISR changes through the `AlterIsr` API while some brokers are on an older
// IBP which assumes notification through Zookeeper. In this case, since the
// controller will not have registered watches for reassigning partitions, we
// can still rely on the batch ISR change notification path in order to
// complete the reassignment.
partitions.filter(controllerContext.partitionsBeingReassigned.contains).foreach { topicPartition =>
maybeCompleteReassignment(topicPartition)
}
}
} finally {
// delete the notifications
@ -2227,7 +2248,8 @@ class KafkaController(val config: KafkaConfig, @@ -2227,7 +2248,8 @@ class KafkaController(val config: KafkaConfig,
eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
}
private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr],
private def processAlterIsr(brokerId: Int, brokerEpoch: Long,
isrsToAlter: Map[TopicPartition, LeaderAndIsr],
callback: AlterIsrCallback): Unit = {
// Handle a few short-circuits
@ -2315,6 +2337,19 @@ class KafkaController(val config: KafkaConfig, @@ -2315,6 +2337,19 @@ class KafkaController(val config: KafkaConfig,
}
callback.apply(response)
// After we have returned the result of the `AlterIsr` request, we should check whether
// there are any reassignments which can be completed by a successful ISR expansion.
response.left.foreach { alterIsrResponses =>
alterIsrResponses.forKeyValue { (topicPartition, partitionResponse) =>
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
val isSuccessfulUpdate = partitionResponse.isRight
if (isSuccessfulUpdate) {
maybeCompleteReassignment(topicPartition)
}
}
}
}
}
private def processControllerChange(): Unit = {

36
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -164,10 +164,26 @@ object HostedPartition { @@ -164,10 +164,26 @@ object HostedPartition {
final object Offline extends HostedPartition
}
case class IsrChangePropagationConfig(
// How often to check for ISR
checkIntervalMs: Long,
// Maximum time that an ISR change may be delayed before sending the notification
maxDelayMs: Long,
// Maximum time to await additional changes before sending the notification
lingerMs: Long
)
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBackoff = 5000L
val IsrChangePropagationInterval = 60000L
// This field is mutable to allow overriding change notification behavior in test cases
@volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 2500,
lingerMs = 5000,
maxDelayMs = 60000,
)
}
class ReplicaManager(val config: KafkaConfig,
@ -233,9 +249,10 @@ class ReplicaManager(val config: KafkaConfig, @@ -233,9 +249,10 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
private val isrChangeNotificationConfig = ReplicaManager.DefaultIsrPropagationConfig
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
private var logDirFailureHandler: LogDirFailureHandler = null
@ -278,7 +295,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -278,7 +295,7 @@ class ReplicaManager(val config: KafkaConfig,
def recordIsrChange(topicPartition: TopicPartition): Unit = {
isrChangeSet synchronized {
isrChangeSet += topicPartition
lastIsrChangeMs.set(System.currentTimeMillis())
lastIsrChangeMs.set(time.milliseconds())
}
}
/**
@ -289,11 +306,11 @@ class ReplicaManager(val config: KafkaConfig, @@ -289,11 +306,11 @@ class ReplicaManager(val config: KafkaConfig,
* other brokers when large amount of ISR change occurs.
*/
def maybePropagateIsrChanges(): Unit = {
val now = System.currentTimeMillis()
val now = time.milliseconds()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBackoff < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
(lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
@ -324,7 +341,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -324,7 +341,8 @@ class ReplicaManager(val config: KafkaConfig,
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
// If using AlterIsr, we don't need the znode ISR propagation
if (config.interBrokerProtocolVersion < KAFKA_2_7_IV2) {
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
} else {
alterIsrManager.start()
}

78
core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala

@ -21,23 +21,24 @@ import java.io.Closeable @@ -21,23 +21,24 @@ import java.io.Closeable
import java.util.{Collections, HashMap, List}
import kafka.admin.ReassignPartitionsCommand._
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.api.KAFKA_2_7_IV1
import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ReplicaManager}
import kafka.utils.Implicits._
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.junit.rules.Timeout
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.rules.Timeout
import org.junit.{After, Rule, Test}
import scala.collection.Map
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
@Rule
@ -45,10 +46,6 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @@ -45,10 +46,6 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
var cluster: ReassignPartitionsTestCluster = null
def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(5, zkConnect).map(KafkaConfig.fromProps)
}
@After
override def tearDown(): Unit = {
Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster")
@ -60,13 +57,53 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @@ -60,13 +57,53 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
}.toMap
/**
* Test running a quick reassignment.
*/
@Test
def testReassignment(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
cluster.setup()
executeAndVerifyReassignment()
}
@Test
def testReassignmentWithAlterIsrDisabled(): Unit = {
// Test reassignment when the IBP is on an older version which does not use
// the `AlterIsr` API. In this case, the controller will register individual
// watches for each reassigning partition so that the reassignment can be
// completed as soon as the ISR is expanded.
val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version)
cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides)
cluster.setup()
executeAndVerifyReassignment()
}
@Test
def testReassignmentCompletionDuringPartialUpgrade(): Unit = {
// Test reassignment during a partial upgrade when some brokers are relying on
// `AlterIsr` and some rely on the old notification logic through Zookeeper.
// In this test case, broker 0 starts up first on the latest IBP and is typically
// elected as controller. The three remaining brokers start up on the older IBP.
// We want to ensure that reassignment can still complete through the ISR change
// notification path even though the controller expects `AlterIsr`.
// Override change notification settings so that test is not delayed by ISR
// change notification delay
ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 500,
lingerMs = 100,
maxDelayMs = 500
)
val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version)
val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig)
cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides)
cluster.setup()
executeAndVerifyReassignment()
}
def executeAndVerifyReassignment(): Unit = {
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
@ -594,7 +631,11 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @@ -594,7 +631,11 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
}
}
class ReassignPartitionsTestCluster(val zkConnect: String) extends Closeable {
class ReassignPartitionsTestCluster(
val zkConnect: String,
configOverrides: Map[String, String] = Map.empty,
brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
) extends Closeable {
val brokers = Map(
0 -> "rack0",
1 -> "rack0",
@ -622,6 +663,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @@ -622,6 +663,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
// Don't move partition leaders automatically.
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
configOverrides.forKeyValue(config.setProperty)
brokerConfigOverrides.get(brokerId).foreach { overrides =>
overrides.forKeyValue(config.setProperty)
}
config
}.toBuffer
@ -637,9 +684,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @@ -637,9 +684,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
}
def createServers(): Unit = {
brokers.keySet.foreach {
case brokerId =>
servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
brokers.keySet.foreach { brokerId =>
servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
}
}

Loading…
Cancel
Save