diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index dd99bf6be76..c76fd2b5011 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -39,6 +39,9 @@ class Partition(val topic: String, var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] private val leaderISRUpdateLock = new Object + private var zkVersion: Int = LeaderAndISR.initialZKVersion + private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1 + this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -90,71 +93,70 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } + + /** + * If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader. + */ + def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = { + leaderISRUpdateLock synchronized { + if (leaderEpoch >= leaderAndISR.leaderEpoch){ + info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request" + .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower")) + return false + } + if(isMakingLeader) + makeLeader(topic, partitionId, leaderAndISR) + else + makeFollower(topic, partitionId, leaderAndISR) + true + } + } + /** - * If the local replica is not the leader, make the local replica the leader in the following steps. + * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. * 1. stop the existing replica fetcher * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = { - leaderISRUpdateLock synchronized { - val shouldBecomeLeader = leaderReplicaIdOpt match { - case Some(leaderReplicaId) => !isReplicaLocal(leaderReplicaId) - case None => true - } - if (shouldBecomeLeader) { - info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(topic, partitionId) + private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) { + trace("Started to become leader at the request %s".format(leaderAndISR.toString())) + // stop replica fetcher thread, if any + replicaFetcherManager.removeFetcher(topic, partitionId) - val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet - // reset LogEndOffset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) - inSyncReplicas = newInSyncReplicas - leaderReplicaIdOpt = Some(localBrokerId) - true - } else - false - } + val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet + // reset LogEndOffset for remote replicas + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) + inSyncReplicas = newInSyncReplicas + leaderEpoch = leaderAndISR.leaderEpoch + zkVersion = leaderAndISR.zkVersion + leaderReplicaIdOpt = Some(localBrokerId) } /** - * If the local replica is not already following the new leader, make it follow the new leader. * 1. stop any existing fetcher on this partition from the local replica * 2. make sure local replica exists and truncate the log to high watermark * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = { - leaderISRUpdateLock synchronized { + private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = { + trace("Started to become follower at the request %s".format(leaderAndISR.toString())) val newLeaderBrokerId: Int = leaderAndISR.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(newLeaderBrokerId, topic, partitionId)) val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head - val currentLeaderBrokerIdOpt = replicaFetcherManager.fetcherSourceBroker(topic, partitionId) - // become follower only if it is not already following the same leader - val shouldBecomeFollower = currentLeaderBrokerIdOpt match { - case Some(currentLeaderBrokerId) => currentLeaderBrokerId != newLeaderBrokerId - case None => true - } - if(shouldBecomeFollower) { - info("Becoming follower to leader %d for topic %s partition %d".format(newLeaderBrokerId, topic, partitionId)) - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(topic, partitionId) + // stop fetcher thread to previous leader + replicaFetcherManager.removeFetcher(topic, partitionId) - // make sure local replica exists - val localReplica = getOrCreateReplica() - localReplica.log.get.truncateTo(localReplica.highWatermark) - inSyncReplicas = Set.empty[Replica] - leaderReplicaIdOpt = Some(newLeaderBrokerId) - - // start fetcher thread to current leader - replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) - true - } else - false - } + // make sure local replica exists + val localReplica = getOrCreateReplica() + localReplica.log.get.truncateTo(localReplica.highWatermark) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndISR.leaderEpoch + zkVersion = leaderAndISR.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + // start fetcher thread to current leader + replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) } def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) { @@ -260,14 +262,15 @@ class Partition(val topic: String, private def updateISR(newISR: Set[Replica]) { info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(","))) - inSyncReplicas = newISR - val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId) - curLeaderAndISR match { - case None => - throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId)) - case Some(m) => - m.ISR = newISR.map(r => r.brokerId).toList - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString) + val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion) + if (updateSucceeded){ + inSyncReplicas = newISR + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7a7ccbfcd0e..33ad9c9fd1d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -154,7 +154,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = { info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId) - if (partition.makeLeader(topic, partitionId, leaderAndISR)) { + if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -169,7 +169,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient .format(leaderBrokerId, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId) - if (partition.makeFollower(topic, partitionId, leaderAndISR)) { + if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition