Browse Source

KAFKA-574 KafkaController unnecessarily reads leaderAndIsr info from ZK ; patched by Prashanth; reviewed by Jun and Neha

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1409618 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Neha Narkhede 12 years ago
parent
commit
22e032fdd4
  1. 139
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 2
      core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
  3. 38
      core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  4. 27
      core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

139
core/src/main/scala/kafka/controller/KafkaController.scala

@ -18,20 +18,20 @@ package kafka.controller
import collection._ import collection._
import collection.immutable.Set import collection.immutable.Set
import kafka.cluster.Broker import com.yammer.metrics.core.Gauge
import java.lang.{IllegalStateException, Object}
import java.util.concurrent.TimeUnit
import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.api._ import kafka.api._
import kafka.cluster.Broker
import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import kafka.utils.ZkUtils._ import kafka.utils.ZkUtils._
import kafka.utils.{Utils, ZkUtils, Logging}
import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import com.yammer.metrics.core.Gauge
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import kafka.utils.{Utils, ZkUtils, Logging}
import org.I0Itec.zkclient.exception.ZkNoNodeException import org.I0Itec.zkclient.exception.ZkNoNodeException
import java.lang.{IllegalStateException, Object}
import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
class ControllerContext(val zkClient: ZkClient, class ControllerContext(val zkClient: ZkClient,
var controllerChannelManager: ControllerChannelManager = null, var controllerChannelManager: ControllerChannelManager = null,
@ -40,7 +40,7 @@ class ControllerContext(val zkClient: ZkClient,
val brokerShutdownLock: Object = new Object, val brokerShutdownLock: Object = new Object,
var allTopics: Set[String] = Set.empty, var allTopics: Set[String] = Set.empty,
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty, var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
var allLeaders: mutable.Map[TopicAndPartition, Int] = mutable.Map.empty, var allLeaders: mutable.Map[TopicAndPartition, LeaderAndIsr] = mutable.Map.empty,
var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
new mutable.HashMap, new mutable.HashMap,
var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
@ -128,33 +128,32 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
trace("All leaders = " + controllerContext.allLeaders.mkString(",")) trace("All leaders = " + controllerContext.allLeaders.mkString(","))
controllerContext.allLeaders.filter { controllerContext.allLeaders.filter {
case (topicAndPartition, leader) => case (topicAndPartition, leader) =>
leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 leader.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
}.map(_._1) }.map(_._1)
} }
val partitionsToMove = replicatedPartitionsBrokerLeads().toSet val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(","))) debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
partitionsToMove.foreach(topicAndPartition => { partitionsToMove.foreach{ topicAndPartition =>
val (topic, partition) = topicAndPartition.asTuple val (topic, partition) = topicAndPartition.asTuple
// move leadership serially to relinquish lock. // move leadership serially to relinquish lock.
controllerContext.controllerLock synchronized { controllerContext.controllerLock synchronized {
controllerContext.allLeaders.get(topicAndPartition).foreach(currLeader => { controllerContext.allLeaders.get(topicAndPartition).foreach{ currLeaderAndIsr =>
if (currLeader == id) { if (currLeaderAndIsr.leader == id) {
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector) controlledShutdownPartitionLeaderSelector)
val newLeader = controllerContext.allLeaders(topicAndPartition) val newLeaderAndIsr = controllerContext.allLeaders(topicAndPartition)
// mark replica offline only if leadership was moved successfully // mark replica offline only if leadership was moved successfully
if (newLeader != currLeader) if (newLeaderAndIsr.leader != currLeaderAndIsr.leader)
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica) replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
} } else
else
debug("Partition %s moved from leader %d to new leader %d during shutdown." debug("Partition %s moved from leader %d to new leader %d during shutdown."
.format(topicAndPartition, id, currLeader)) .format(topicAndPartition, id, currLeaderAndIsr.leader))
}) }
} }
}) }
/* /*
* Force the shutting down broker out of the ISR of partitions that it * Force the shutting down broker out of the ISR of partitions that it
@ -166,7 +165,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
allPartitionsAndReplicationFactorOnBroker foreach { allPartitionsAndReplicationFactorOnBroker foreach {
case(topicAndPartition, replicationFactor) => case(topicAndPartition, replicationFactor) =>
val (topic, partition) = topicAndPartition.asTuple val (topic, partition) = topicAndPartition.asTuple
if (controllerContext.allLeaders(topicAndPartition) != id) { if (controllerContext.allLeaders(topicAndPartition).leader != id) {
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false) brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
removeReplicaFromIsr(topic, partition, id) match { removeReplicaFromIsr(topic, partition, id) match {
case Some(updatedLeaderAndIsr) => case Some(updatedLeaderAndIsr) =>
@ -222,34 +221,42 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
/** /**
* This callback is invoked by the replica state machine's broker change listener, with the list of newly started * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
* brokers as input. It does the following - * brokers as input. It does the following -
* 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update * 1. Triggers the OnlinePartition state change for all new/offline partitions
* leader and ISR for every partition as they take place * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If
* 2. Triggers the OnlinePartition state change for all new/offline partitions * so, it performs the reassignment logic for each topic/partition.
* 3. Invokes the OnlineReplica state change on the input list of newly started brokers *
* Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons:
* 1. The partition state machine, when triggering online state change, will refresh leader and ISR for only those
* partitions currently new or offline (rather than every partition this controller is aware of)
* 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches
* every broker that it is still valid. Brokers check the leader epoch to determine validity of the request.
*/ */
def onBrokerStartup(newBrokers: Seq[Int]) { def onBrokerStartup(newBrokers: Seq[Int]) {
info("New broker startup callback for %s".format(newBrokers.mkString(","))) info("New broker startup callback for %s".format(newBrokers.mkString(",")))
// update leader and isr cache for broker val newBrokersSet = newBrokers.toSet
updateLeaderAndIsrCache()
// update partition state machine // update partition state machine
partitionStateMachine.triggerOnlinePartitionStateChange() partitionStateMachine.triggerOnlinePartitionStateChange()
replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
OnlineReplica)
// check if reassignment of some partitions need to be restarted // check if reassignment of some partitions need to be restarted
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p => val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{
p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a)) case (topicAndPartition, reassignmentContext) =>
reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
}
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2)) partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
} }
/** /**
* This callback is invoked by the replica state machine's broker change listener with the list of failed brokers * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
* as input. It does the following - * as input. It does the following -
* 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update * 1. Mark partitions with dead leaders as offline
* leader and ISR for every partition as they take place * 2. Triggers the OnlinePartition state change for all new/offline partitions
* 2. Mark partitions with dead leaders offline * 3. Invokes the OfflineReplica state change on the input list of newly started brokers
* 3. Triggers the OnlinePartition state change for all new/offline partitions *
* 4. Invokes the OfflineReplica state change on the input list of newly started brokers * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because
* the partition state machine will refresh our cache for us when performing leader election for all new/offline
* partitions coming online.
*/ */
def onBrokerFailure(deadBrokers: Seq[Int]) { def onBrokerFailure(deadBrokers: Seq[Int]) {
info("Broker failure callback for %s".format(deadBrokers.mkString(","))) info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
@ -258,17 +265,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id)) deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown)) info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
// update leader and isr cache for broker val deadBrokersSet = deadBrokers.toSet
updateLeaderAndIsrCache()
// trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
deadBrokers.contains(partitionAndLeader._2)).keySet deadBrokersSet.contains(partitionAndLeader._2.leader)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions // trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange() partitionStateMachine.triggerOnlinePartitionStateChange()
// handle dead replicas // handle dead replicas
replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
OfflineReplica)
} }
/** /**
@ -391,9 +396,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def initializeControllerContext() { private def initializeControllerContext() {
controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
controllerContext.allTopics.toSeq) controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, Int]
// update the leader and isr cache for all existing partitions from Zookeeper // update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache() updateLeaderAndIsrCache()
// start the channel manager // start the channel manager
@ -425,7 +429,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient) val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
// check if they are already completed // check if they are already completed
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition => val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
controllerContext.allLeaders(partition) == controllerContext.partitionReplicaAssignment(partition).head) controllerContext.allLeaders(partition).leader == controllerContext.partitionReplicaAssignment(partition).head)
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(","))) info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
@ -445,7 +449,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
case true => case true =>
controllerContext.allLeaders.put(topicPartition, leaderAndIsr.leader) controllerContext.allLeaders.put(topicPartition, leaderAndIsr)
case false => case false =>
debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) + debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) +
"partition %s is dead, just ignore it".format(topicPartition)) "partition %s is dead, just ignore it".format(topicPartition))
@ -465,14 +469,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition, private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) { reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas val reassignedReplicas = reassignedPartitionContext.newReplicas
val currentLeader = controllerContext.allLeaders(topicAndPartition) val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
// move the leader to one of the alive and caught up new replicas // move the leader to one of the alive and caught up new replicas
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
} } else {
else {
// check if the leader is alive or not // check if the leader is alive or not
controllerContext.liveBrokerIds.contains(currentLeader) match { controllerContext.liveBrokerIds.contains(currentLeader) match {
case true => case true =>
@ -565,14 +568,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
} }
def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
partitionsToBeRemoved.foreach { partition => for(partition <- partitionsToBeRemoved) {
// check the status // check the status
val currentLeader = controllerContext.allLeaders(partition) val currentLeader = controllerContext.allLeaders(partition).leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
if(currentLeader == preferredReplica) { if(currentLeader == preferredReplica) {
info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica)) info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
} } else {
else {
warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
} }
} }
@ -608,23 +610,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
if (leaderAndIsr.isr.contains(replicaId)) { if (leaderAndIsr.isr.contains(replicaId)) {
val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry // update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), leaderAndIsr.zkVersion)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion newLeaderAndIsr.zkVersion = newVersion
finalLeaderAndIsr = Some(newLeaderAndIsr) finalLeaderAndIsr = Some(newLeaderAndIsr)
if (updateSucceeded) if (updateSucceeded) {
info("New leader and ISR for partition [%s, %d] is %s" // we've successfully written to ZK, let's refresh our cache
.format(topic, partition, newLeaderAndIsr.toString())) info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr)
}
updateSucceeded updateSucceeded
} } else {
else {
warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
.format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
finalLeaderAndIsr = Some(leaderAndIsr) finalLeaderAndIsr = Some(leaderAndIsr)
true true
} }
@ -705,16 +706,14 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas" throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
.format(topicAndPartition) + .format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} } else {
else {
if(aliveNewReplicas == newReplicas) { if(aliveNewReplicas == newReplicas) {
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
newReplicas.mkString(","))) newReplicas.mkString(",")))
val context = createReassignmentContextForPartition(topic, partition, newReplicas) val context = createReassignmentContextForPartition(topic, partition, newReplicas)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, context) controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
controller.onPartitionReassignment(topicAndPartition, context) controller.onPartitionReassignment(topicAndPartition, context)
} } else {
else {
// some replica in RAR is not alive. Fail partition reassignment // some replica in RAR is not alive. Fail partition reassignment
throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
" %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
@ -724,7 +723,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
.format(topicAndPartition)) .format(topicAndPartition))
} }
}catch { } catch {
case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client // remove the partition from the admin path to unblock the admin client
controller.removePartitionFromReassignedPartitions(topicAndPartition) controller.removePartitionFromReassignedPartitions(topicAndPartition)

2
core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala

@ -128,7 +128,7 @@ with Logging {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader // check if preferred replica is the current leader
val currentLeader = controllerContext.allLeaders(topicAndPartition) val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
if(currentLeader == preferredReplica) { if(currentLeader == preferredReplica) {
throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]" throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
.format(preferredReplica, topic, partition)) .format(preferredReplica, topic, partition))

38
core/src/main/scala/kafka/controller/PartitionStateMachine.scala

@ -17,13 +17,13 @@
package kafka.controller package kafka.controller
import collection._ import collection._
import collection.JavaConversions._
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
import kafka.utils.{Logging, ZkUtils} import kafka.utils.{Logging, ZkUtils}
import org.I0Itec.zkclient.IZkChildListener import org.I0Itec.zkclient.IZkChildListener
import collection.JavaConversions._
import java.util.concurrent.atomic.AtomicBoolean
import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
/** /**
* This class represents the state machine for partitions. It defines the states that a partition can be in, and * This class represents the state machine for partitions. It defines the states that a partition can be in, and
@ -44,7 +44,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest) val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private var isShuttingDown = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false)
/** /**
* Invoked on successful controller election. First registers a topic change listener since that triggers all * Invoked on successful controller election. First registers a topic change listener since that triggers all
@ -81,13 +81,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
try { try {
brokerRequestBatch.newBatch() brokerRequestBatch.newBatch()
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
partitionState.filter(partitionAndState => for((topicAndPartition, partitionState) <- partitionState) {
partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach { if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition, handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
offlinePartitionSelector)
} }
brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
}catch { } catch {
case e => error("Error while moving some partitions to the online state", e) case e => error("Error while moving some partitions to the online state", e)
} }
} }
@ -106,7 +105,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
} }
brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
}catch { } catch {
case e => error("Error while moving some partitions to %s state".format(targetState), e) case e => error("Error while moving some partitions to %s state".format(targetState), e)
} }
} }
@ -145,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case _ => // should never come here since illegal previous states are checked above case _ => // should never come here since illegal previous states are checked above
} }
info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition, info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition))) partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leader))
partitionState.put(topicAndPartition, OnlinePartition) partitionState.put(topicAndPartition, OnlinePartition)
// post: partition has a leader // post: partition has a leader
case OfflinePartition => case OfflinePartition =>
@ -162,7 +161,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
partitionState.put(topicAndPartition, NonExistentPartition) partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper // post: partition state is deleted from all brokers and zookeeper
} }
}catch { } catch {
case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) + case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
"from %s to %s failed".format(currState, targetState), t) "from %s to %s failed".format(currState, targetState), t)
} }
@ -241,9 +240,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// GC pause // GC pause
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderAndIsr, replicaAssignment.size) topicAndPartition.partition, leaderAndIsr, replicaAssignment.size)
controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr.leader) controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr)
partitionState.put(topicAndPartition, OnlinePartition) partitionState.put(topicAndPartition, OnlinePartition)
}catch { } catch {
case e: ZkNodeExistsException => case e: ZkNodeExistsException =>
ControllerStat.offlinePartitionRate.mark() ControllerStat.offlinePartitionRate.mark()
throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
@ -260,7 +259,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.) * @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
*/ */
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
/** handle leader election for the partitions whose leader is no longer alive **/ // handle leader election for the partitions whose leader is no longer alive
info("Electing leader for partition [%s, %d]".format(topic, partition)) info("Electing leader for partition [%s, %d]".format(topic, partition))
try { try {
var zookeeperPathUpdateSucceeded: Boolean = false var zookeeperPathUpdateSucceeded: Boolean = false
@ -278,13 +277,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
replicasForThisPartition = replicas replicasForThisPartition = replicas
} }
// update the leader cache // update the leader cache
controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr.leader) controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr)
info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
// store new leader and isr info in cache // notify all replicas of the new leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr,
topic, partition, newLeaderAndIsr,
controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
}catch { } catch {
case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
.format(topic, partition) + " Marking this partition offline", poe) .format(topic, partition) + " Marking this partition offline", poe)
case sce => throw new StateChangeFailedException(("Error while electing leader for partition " + case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +

27
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

@ -17,11 +17,11 @@
package kafka.controller package kafka.controller
import collection._ import collection._
import kafka.utils.{ZkUtils, Logging}
import collection.JavaConversions._ import collection.JavaConversions._
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import org.I0Itec.zkclient.IZkChildListener
import kafka.common.{TopicAndPartition, StateChangeFailedException} import kafka.common.{TopicAndPartition, StateChangeFailedException}
import kafka.utils.{ZkUtils, Logging}
import org.I0Itec.zkclient.IZkChildListener
/** /**
* This class represents the state machine for replicas. It defines the states that a replica can be in, and * This class represents the state machine for replicas. It defines the states that a replica can be in, and
@ -113,7 +113,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica" throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
.format(replicaId, topic, partition) + "state as it is being requested to become leader") .format(replicaId, topic, partition) + "state as it is being requested to become leader")
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr, replicaAssignment.size) topic, partition, leaderAndIsr, replicaAssignment.size)
case None => // new leader request will be sent to this replica when one gets elected case None => // new leader request will be sent to this replica when one gets elected
} }
replicaState.put((topic, partition, replicaId), NewReplica) replicaState.put((topic, partition, replicaId), NewReplica)
@ -137,16 +137,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
case _ => case _ =>
// check if the leader for this partition is alive or even exists // if the leader for this replica exists and is alive, send the leader and ISR
// NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper controllerContext.allLeaders.get(topicAndPartition) match {
// for the ISR anyways
val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
leaderAndIsrOpt match {
case Some(leaderAndIsr) => case Some(leaderAndIsr) =>
controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
case true => // leader is alive case true => // leader is alive
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr, replicaAssignment.size) topic, partition, leaderAndIsr, replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OnlineReplica) replicaState.put((topic, partition, replicaId), OnlineReplica)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
case false => // ignore partitions whose leader is not alive case false => // ignore partitions whose leader is not alive
@ -158,21 +155,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case OfflineReplica => case OfflineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
// As an optimization, the controller removes dead replicas from the ISR // As an optimization, the controller removes dead replicas from the ISR
val currLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) val leaderAndIsrIsEmpty = controllerContext.allLeaders.get(topicAndPartition) match {
val leaderAndIsrIsEmpty: Boolean = currLeaderAndIsrOpt match {
case Some(currLeaderAndIsr) => case Some(currLeaderAndIsr) =>
if (currLeaderAndIsr.isr.contains(replicaId)) if (currLeaderAndIsr.isr.contains(replicaId))
controller.removeReplicaFromIsr(topic, partition, replicaId) match { controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderAndIsr) => case Some(updatedLeaderAndIsr) =>
// send the shrunk ISR state change request only to the leader // send the shrunk ISR state change request only to the leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader), brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader),
topic, partition, updatedLeaderAndIsr, topic, partition, updatedLeaderAndIsr, replicaAssignment.size)
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OfflineReplica) replicaState.put((topic, partition, replicaId), OfflineReplica)
info("Replica %d for partition [%s, %d] state changed to OfflineReplica" info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
.format(replicaId, topic, partition)) info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
info("Removed offline replica %d from ISR for partition [%s, %d]"
.format(replicaId, topic, partition))
false false
case None => case None =>
true true

Loading…
Cancel
Save