diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c434f9d75ea..ceac0c60104 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,20 +18,20 @@ package kafka.controller import collection._ import collection.immutable.Set -import kafka.cluster.Broker +import com.yammer.metrics.core.Gauge +import java.lang.{IllegalStateException, Object} +import java.util.concurrent.TimeUnit +import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ +import kafka.cluster.Broker +import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException} +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} +import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ +import kafka.utils.{Utils, ZkUtils, Logging} import org.apache.zookeeper.Watcher.Event.KeeperState -import kafka.server.{ZookeeperLeaderElector, KafkaConfig} -import java.util.concurrent.TimeUnit -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} -import kafka.utils.{Utils, ZkUtils, Logging} import org.I0Itec.zkclient.exception.ZkNoNodeException -import java.lang.{IllegalStateException, Object} -import kafka.admin.PreferredReplicaLeaderElectionCommand -import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException} class ControllerContext(val zkClient: ZkClient, var controllerChannelManager: ControllerChannelManager = null, @@ -40,7 +40,7 @@ class ControllerContext(val zkClient: ZkClient, val brokerShutdownLock: Object = new Object, var allTopics: Set[String] = Set.empty, var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty, - var allLeaders: mutable.Map[TopicAndPartition, Int] = mutable.Map.empty, + var allLeaders: mutable.Map[TopicAndPartition, LeaderAndIsr] = mutable.Map.empty, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap, var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = @@ -128,33 +128,32 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg trace("All leaders = " + controllerContext.allLeaders.mkString(",")) controllerContext.allLeaders.filter { case (topicAndPartition, leader) => - leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 + leader.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 }.map(_._1) } val partitionsToMove = replicatedPartitionsBrokerLeads().toSet debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(","))) - partitionsToMove.foreach(topicAndPartition => { + partitionsToMove.foreach{ topicAndPartition => val (topic, partition) = topicAndPartition.asTuple // move leadership serially to relinquish lock. controllerContext.controllerLock synchronized { - controllerContext.allLeaders.get(topicAndPartition).foreach(currLeader => { - if (currLeader == id) { + controllerContext.allLeaders.get(topicAndPartition).foreach{ currLeaderAndIsr => + if (currLeaderAndIsr.leader == id) { partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, controlledShutdownPartitionLeaderSelector) - val newLeader = controllerContext.allLeaders(topicAndPartition) + val newLeaderAndIsr = controllerContext.allLeaders(topicAndPartition) // mark replica offline only if leadership was moved successfully - if (newLeader != currLeader) + if (newLeaderAndIsr.leader != currLeaderAndIsr.leader) replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica) - } - else + } else debug("Partition %s moved from leader %d to new leader %d during shutdown." - .format(topicAndPartition, id, currLeader)) - }) + .format(topicAndPartition, id, currLeaderAndIsr.leader)) + } } - }) + } /* * Force the shutting down broker out of the ISR of partitions that it @@ -166,7 +165,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg allPartitionsAndReplicationFactorOnBroker foreach { case(topicAndPartition, replicationFactor) => val (topic, partition) = topicAndPartition.asTuple - if (controllerContext.allLeaders(topicAndPartition) != id) { + if (controllerContext.allLeaders(topicAndPartition).leader != id) { brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false) removeReplicaFromIsr(topic, partition, id) match { case Some(updatedLeaderAndIsr) => @@ -222,34 +221,42 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - - * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update - * leader and ISR for every partition as they take place - * 2. Triggers the OnlinePartition state change for all new/offline partitions - * 3. Invokes the OnlineReplica state change on the input list of newly started brokers + * 1. Triggers the OnlinePartition state change for all new/offline partitions + * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If + * so, it performs the reassignment logic for each topic/partition. + * + * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: + * 1. The partition state machine, when triggering online state change, will refresh leader and ISR for only those + * partitions currently new or offline (rather than every partition this controller is aware of) + * 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches + * every broker that it is still valid. Brokers check the leader epoch to determine validity of the request. */ def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) - // update leader and isr cache for broker - updateLeaderAndIsrCache() + val newBrokersSet = newBrokers.toSet // update partition state machine partitionStateMachine.triggerOnlinePartitionStateChange() - replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), - OnlineReplica) + replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica) + // check if reassignment of some partitions need to be restarted - val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p => - p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a)) + val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{ + case (topicAndPartition, reassignmentContext) => + reassignmentContext.newReplicas.exists(newBrokersSet.contains(_)) + } partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2)) } /** * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers * as input. It does the following - - * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update - * leader and ISR for every partition as they take place - * 2. Mark partitions with dead leaders offline - * 3. Triggers the OnlinePartition state change for all new/offline partitions - * 4. Invokes the OfflineReplica state change on the input list of newly started brokers + * 1. Mark partitions with dead leaders as offline + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. Invokes the OfflineReplica state change on the input list of newly started brokers + * + * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because + * the partition state machine will refresh our cache for us when performing leader election for all new/offline + * partitions coming online. */ def onBrokerFailure(deadBrokers: Seq[Int]) { info("Broker failure callback for %s".format(deadBrokers.mkString(","))) @@ -258,17 +265,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id)) info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown)) - // update leader and isr cache for broker - updateLeaderAndIsrCache() + val deadBrokersSet = deadBrokers.toSet // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => - deadBrokers.contains(partitionAndLeader._2)).keySet + deadBrokersSet.contains(partitionAndLeader._2.leader)).keySet partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // handle dead replicas - replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), - OfflineReplica) + replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica) } /** @@ -391,9 +396,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def initializeControllerContext() { controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet - controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, - controllerContext.allTopics.toSeq) - controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, Int] + controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) + controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderAndIsr] // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager @@ -425,7 +429,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient) // check if they are already completed val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition => - controllerContext.allLeaders(partition) == controllerContext.partitionReplicaAssignment(partition).head) + controllerContext.allLeaders(partition).leader == controllerContext.partitionReplicaAssignment(partition).head) controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(","))) @@ -445,7 +449,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { case true => - controllerContext.allLeaders.put(topicPartition, leaderAndIsr.leader) + controllerContext.allLeaders.put(topicPartition, leaderAndIsr) case false => debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) + "partition %s is dead, just ignore it".format(topicPartition)) @@ -465,14 +469,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas - val currentLeader = controllerContext.allLeaders(topicAndPartition) + val currentLeader = controllerContext.allLeaders(topicAndPartition).leader if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) // move the leader to one of the alive and caught up new replicas partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) - } - else { + } else { // check if the leader is alive or not controllerContext.liveBrokerIds.contains(currentLeader) match { case true => @@ -565,14 +568,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { - partitionsToBeRemoved.foreach { partition => + for(partition <- partitionsToBeRemoved) { // check the status - val currentLeader = controllerContext.allLeaders(partition) + val currentLeader = controllerContext.allLeaders(partition).leader val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head if(currentLeader == preferredReplica) { info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica)) - } - else { + } else { warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } @@ -608,23 +610,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes if (leaderAndIsr.isr.contains(replicaId)) { val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, - leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) + leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( - zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), - leaderAndIsr.zkVersion) + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion finalLeaderAndIsr = Some(newLeaderAndIsr) - if (updateSucceeded) - info("New leader and ISR for partition [%s, %d] is %s" - .format(topic, partition, newLeaderAndIsr.toString())) + if (updateSucceeded) { + // we've successfully written to ZK, let's refresh our cache + info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) + controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr) + } updateSucceeded - } - else { + } else { warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" - .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) + .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) finalLeaderAndIsr = Some(leaderAndIsr) true } @@ -705,16 +706,14 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL throw new KafkaException("Partition %s to be reassigned is already assigned to replicas" .format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) - } - else { + } else { if(aliveNewReplicas == newReplicas) { info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) val context = createReassignmentContextForPartition(topic, partition, newReplicas) controllerContext.partitionsBeingReassigned.put(topicAndPartition, context) controller.onPartitionReassignment(topicAndPartition, context) - } - else { + } else { // some replica in RAR is not alive. Fail partition reassignment throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + @@ -724,7 +723,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" .format(topicAndPartition)) } - }catch { + } catch { case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) // remove the partition from the admin path to unblock the admin client controller.removePartitionFromReassignedPartitions(topicAndPartition) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index a378c5ef3cc..4ce15699eac 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -128,7 +128,7 @@ with Logging { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader - val currentLeader = controllerContext.allLeaders(topicAndPartition) + val currentLeader = controllerContext.allLeaders(topicAndPartition).leader if(currentLeader == preferredReplica) { throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]" .format(preferredReplica, topic, partition)) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index ec3bee59c46..fd3a6fae81c 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,13 +17,13 @@ package kafka.controller import collection._ +import collection.JavaConversions._ +import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr +import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException} import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.IZkChildListener -import collection.JavaConversions._ -import java.util.concurrent.atomic.AtomicBoolean import org.I0Itec.zkclient.exception.ZkNodeExistsException -import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException} /** * This class represents the state machine for partitions. It defines the states that a partition can be in, and @@ -44,7 +44,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) - private var isShuttingDown = new AtomicBoolean(false) + private val isShuttingDown = new AtomicBoolean(false) /** * Invoked on successful controller election. First registers a topic change listener since that triggers all @@ -81,13 +81,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { try { brokerRequestBatch.newBatch() // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state - partitionState.filter(partitionAndState => - partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach { - partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition, - offlinePartitionSelector) + for((topicAndPartition, partitionState) <- partitionState) { + if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) + handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector) } brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) - }catch { + } catch { case e => error("Error while moving some partitions to the online state", e) } } @@ -106,7 +105,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) } brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) - }catch { + } catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) } } @@ -145,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { case _ => // should never come here since illegal previous states are checked above } info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition, - partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition))) + partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leader)) partitionState.put(topicAndPartition, OnlinePartition) // post: partition has a leader case OfflinePartition => @@ -162,7 +161,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitionState.put(topicAndPartition, NonExistentPartition) // post: partition state is deleted from all brokers and zookeeper } - }catch { + } catch { case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) + "from %s to %s failed".format(currState, targetState), t) } @@ -241,9 +240,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // GC pause brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, topicAndPartition.partition, leaderAndIsr, replicaAssignment.size) - controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr.leader) + controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr) partitionState.put(topicAndPartition, OnlinePartition) - }catch { + } catch { case e: ZkNodeExistsException => ControllerStat.offlinePartitionRate.mark() throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" @@ -260,7 +259,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.) */ def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { - /** handle leader election for the partitions whose leader is no longer alive **/ + // handle leader election for the partitions whose leader is no longer alive info("Electing leader for partition [%s, %d]".format(topic, partition)) try { var zookeeperPathUpdateSucceeded: Boolean = false @@ -278,13 +277,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { replicasForThisPartition = replicas } // update the leader cache - controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr.leader) + controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr) info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) - // store new leader and isr info in cache - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, - topic, partition, newLeaderAndIsr, + // notify all replicas of the new leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) - }catch { + } catch { case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." .format(topic, partition) + " Marking this partition offline", poe) case sce => throw new StateChangeFailedException(("Error while electing leader for partition " + diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index cfa771aaa2c..8dc148d8ada 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -17,11 +17,11 @@ package kafka.controller import collection._ -import kafka.utils.{ZkUtils, Logging} import collection.JavaConversions._ import java.util.concurrent.atomic.AtomicBoolean -import org.I0Itec.zkclient.IZkChildListener import kafka.common.{TopicAndPartition, StateChangeFailedException} +import kafka.utils.{ZkUtils, Logging} +import org.I0Itec.zkclient.IZkChildListener /** * This class represents the state machine for replicas. It defines the states that a replica can be in, and @@ -113,7 +113,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica" .format(replicaId, topic, partition) + "state as it is being requested to become leader") brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderAndIsr, replicaAssignment.size) + topic, partition, leaderAndIsr, replicaAssignment.size) case None => // new leader request will be sent to this replica when one gets elected } replicaState.put((topic, partition, replicaId), NewReplica) @@ -137,16 +137,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) case _ => - // check if the leader for this partition is alive or even exists - // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper - // for the ISR anyways - val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - leaderAndIsrOpt match { + // if the leader for this replica exists and is alive, send the leader and ISR + controllerContext.allLeaders.get(topicAndPartition) match { case Some(leaderAndIsr) => controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { case true => // leader is alive brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderAndIsr, replicaAssignment.size) + topic, partition, leaderAndIsr, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OnlineReplica) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) case false => // ignore partitions whose leader is not alive @@ -158,21 +155,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) // As an optimization, the controller removes dead replicas from the ISR - val currLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - val leaderAndIsrIsEmpty: Boolean = currLeaderAndIsrOpt match { + val leaderAndIsrIsEmpty = controllerContext.allLeaders.get(topicAndPartition) match { case Some(currLeaderAndIsr) => if (currLeaderAndIsr.isr.contains(replicaId)) controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some(updatedLeaderAndIsr) => // send the shrunk ISR state change request only to the leader brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader), - topic, partition, updatedLeaderAndIsr, - replicaAssignment.size) + topic, partition, updatedLeaderAndIsr, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OfflineReplica) - info("Replica %d for partition [%s, %d] state changed to OfflineReplica" - .format(replicaId, topic, partition)) - info("Removed offline replica %d from ISR for partition [%s, %d]" - .format(replicaId, topic, partition)) + info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition)) + info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition)) false case None => true