Browse Source

update leaderAndISR in ZK conditionally; patched by Yang Ye; reviewed by Jun Rao; KAFKA-428

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1377157 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
93e9dbd026
  1. 109
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 4
      core/src/main/scala/kafka/server/ReplicaManager.scala

109
core/src/main/scala/kafka/cluster/Partition.scala

@ -39,6 +39,9 @@ class Partition(val topic: String,
var inSyncReplicas: Set[Replica] = Set.empty[Replica] var inSyncReplicas: Set[Replica] = Set.empty[Replica]
private val assignedReplicaMap = new Pool[Int,Replica] private val assignedReplicaMap = new Pool[Int,Replica]
private val leaderISRUpdateLock = new Object private val leaderISRUpdateLock = new Object
private var zkVersion: Int = LeaderAndISR.initialZKVersion
private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1
this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@ -90,71 +93,70 @@ class Partition(val topic: String,
assignedReplicaMap.values.toSet assignedReplicaMap.values.toSet
} }
/**
* If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
*/
def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = {
leaderISRUpdateLock synchronized {
if (leaderEpoch >= leaderAndISR.leaderEpoch){
info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
.format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
return false
}
if(isMakingLeader)
makeLeader(topic, partitionId, leaderAndISR)
else
makeFollower(topic, partitionId, leaderAndISR)
true
}
}
/** /**
* If the local replica is not the leader, make the local replica the leader in the following steps. * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps.
* 1. stop the existing replica fetcher * 1. stop the existing replica fetcher
* 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available)
* 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* 4. set the new leader and ISR * 4. set the new leader and ISR
*/ */
def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = { private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
leaderISRUpdateLock synchronized { trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
val shouldBecomeLeader = leaderReplicaIdOpt match { // stop replica fetcher thread, if any
case Some(leaderReplicaId) => !isReplicaLocal(leaderReplicaId) replicaFetcherManager.removeFetcher(topic, partitionId)
case None => true
}
if (shouldBecomeLeader) {
info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(topic, partitionId)
val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
// reset LogEndOffset for remote replicas // reset LogEndOffset for remote replicas
assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
inSyncReplicas = newInSyncReplicas inSyncReplicas = newInSyncReplicas
leaderReplicaIdOpt = Some(localBrokerId) leaderEpoch = leaderAndISR.leaderEpoch
true zkVersion = leaderAndISR.zkVersion
} else leaderReplicaIdOpt = Some(localBrokerId)
false
}
} }
/** /**
* If the local replica is not already following the new leader, make it follow the new leader.
* 1. stop any existing fetcher on this partition from the local replica * 1. stop any existing fetcher on this partition from the local replica
* 2. make sure local replica exists and truncate the log to high watermark * 2. make sure local replica exists and truncate the log to high watermark
* 3. set the leader and set ISR to empty * 3. set the leader and set ISR to empty
* 4. start a fetcher to the new leader * 4. start a fetcher to the new leader
*/ */
def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = { private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
leaderISRUpdateLock synchronized { trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
val newLeaderBrokerId: Int = leaderAndISR.leader val newLeaderBrokerId: Int = leaderAndISR.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d" info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(newLeaderBrokerId, topic, partitionId)) .format(newLeaderBrokerId, topic, partitionId))
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head
val currentLeaderBrokerIdOpt = replicaFetcherManager.fetcherSourceBroker(topic, partitionId) // stop fetcher thread to previous leader
// become follower only if it is not already following the same leader replicaFetcherManager.removeFetcher(topic, partitionId)
val shouldBecomeFollower = currentLeaderBrokerIdOpt match {
case Some(currentLeaderBrokerId) => currentLeaderBrokerId != newLeaderBrokerId
case None => true
}
if(shouldBecomeFollower) {
info("Becoming follower to leader %d for topic %s partition %d".format(newLeaderBrokerId, topic, partitionId))
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(topic, partitionId)
// make sure local replica exists // make sure local replica exists
val localReplica = getOrCreateReplica() val localReplica = getOrCreateReplica()
localReplica.log.get.truncateTo(localReplica.highWatermark) localReplica.log.get.truncateTo(localReplica.highWatermark)
inSyncReplicas = Set.empty[Replica] inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = Some(newLeaderBrokerId) leaderEpoch = leaderAndISR.leaderEpoch
zkVersion = leaderAndISR.zkVersion
// start fetcher thread to current leader leaderReplicaIdOpt = Some(newLeaderBrokerId)
replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) // start fetcher thread to current leader
true replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
} else
false
}
} }
def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) { def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
@ -260,14 +262,15 @@ class Partition(val topic: String,
private def updateISR(newISR: Set[Replica]) { private def updateISR(newISR: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(","))) info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
inSyncReplicas = newISR val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
curLeaderAndISR match { ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
case None => if (updateSucceeded){
throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId)) inSyncReplicas = newISR
case Some(m) => zkVersion = newVersion
m.ISR = newISR.map(r => r.brokerId).toList trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion))
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString) } else {
info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
} }
} }

4
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -154,7 +154,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = { private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId) val partition = getOrCreatePartition(topic, partitionId)
if (partition.makeLeader(topic, partitionId, leaderAndISR)) { if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
// also add this partition to the list of partitions for which the leader is the current broker // also add this partition to the list of partitions for which the leader is the current broker
leaderPartitionsLock synchronized { leaderPartitionsLock synchronized {
leaderPartitions += partition leaderPartitions += partition
@ -169,7 +169,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
.format(leaderBrokerId, topic, partitionId)) .format(leaderBrokerId, topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId) val partition = getOrCreatePartition(topic, partitionId)
if (partition.makeFollower(topic, partitionId, leaderAndISR)) { if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
// remove this replica's partition from the ISR expiration queue // remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized { leaderPartitionsLock synchronized {
leaderPartitions -= partition leaderPartitions -= partition

Loading…
Cancel
Save