Browse Source

KAFKA-708 ISR becomes empty while marking a partition offline; reviewed by Jun Rao

0.8.0-beta1-candidate1
Neha Narkhede 12 years ago
parent
commit
eb0d5a7f81
  1. 2
      core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
  2. 26
      core/src/main/scala/kafka/controller/KafkaController.scala
  3. 47
      core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
  4. 26
      core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  5. 71
      core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
  6. 2
      core/src/main/scala/kafka/utils/Logging.scala
  7. 2
      core/src/main/scala/kafka/utils/Utils.scala
  8. 11
      core/src/main/scala/kafka/utils/ZkUtils.scala
  9. 12
      core/src/test/scala/unit/kafka/utils/UtilsTest.scala

2
core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala

@ -48,7 +48,7 @@ object PartitionStateInfo { @@ -48,7 +48,7 @@ object PartitionStateInfo {
val leader = buffer.getInt
val leaderEpoch = buffer.getInt
val isrString = readShortString(buffer)
val isr = isrString.split(",").map(_.toInt).toList
val isr = Utils.parseCsvList(isrString).map(_.toInt).toList
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),

26
core/src/main/scala/kafka/controller/KafkaController.scala

@ -671,7 +671,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg @@ -671,7 +671,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
"means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
if (leaderAndIsr.isr.contains(replicaId)) {
val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
// if the replica to be removed from the ISR is also the leader, set the new leader value to -1
val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader
val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1,
leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
@ -683,8 +685,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg @@ -683,8 +685,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
if (updateSucceeded)
info("New leader and ISR for partition [%s, %d] is %s"
.format(topic, partition, newLeaderAndIsr.toString()))
info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
updateSucceeded
} else {
warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
@ -721,7 +722,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg @@ -721,7 +722,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
if(controllerContext.controllerChannelManager != null) {
info("session expires, clean up the state")
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
@ -766,13 +766,11 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL @@ -766,13 +766,11 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if(assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
.format(topicAndPartition) +
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
if(aliveNewReplicas == newReplicas) {
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
newReplicas.mkString(",")))
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
val context = createReassignmentContextForPartition(topic, partition, newReplicas)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
controller.onPartitionReassignment(topicAndPartition, context)
@ -851,18 +849,18 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: @@ -851,18 +849,18 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
if(caughtUpReplicas == reassignedReplicas) {
// resume the partition reassignment process
info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Resuming partition reassignment")
controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
else {
info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
}
case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created"
.format(topic, partition, reassignedReplicas.mkString(",")))
case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
.format(topicAndPartition, reassignedReplicas.mkString(",")))
}
case None =>
}

47
core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala

@ -23,15 +23,14 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff @@ -23,15 +23,14 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff
trait PartitionLeaderSelector {
/**
* @param topic The topic of the partition whose leader needs to be elected
* @param partition The partition whose leader needs to be elected
* @param topicAndPartition The topic and partition whose leader needs to be elected
* @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper
* @throws PartitionOfflineException If no replica in the assigned replicas list is alive
* @return The leader and isr request, with the newly selected leader info, to send to the brokers
* Also, returns the list of replicas the returned leader and isr request should be sent to
* This API selects a new leader for the input partition
*/
def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}
@ -45,8 +44,8 @@ trait PartitionLeaderSelector { @@ -45,8 +44,8 @@ trait PartitionLeaderSelector {
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[OfflinePartitionLeaderSelector]: "
def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(TopicAndPartition(topic, partition)) match {
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
@ -60,7 +59,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten @@ -60,7 +59,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
case true =>
ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException(("No replica for partition " +
"([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
@ -74,13 +73,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten @@ -74,13 +73,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(), topic,
partition))
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicasToThisPartition)
case None =>
ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
"replicas assigned to it")
throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
}
}
}
@ -91,8 +88,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten @@ -91,8 +88,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val reassignedReplicas = controllerContext.partitionsBeingReassigned(TopicAndPartition(topic, partition)).newReplicas
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
// pick any replica from the newly assigned replicas list that is in the ISR
@ -105,10 +102,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex @@ -105,10 +102,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
reassignedReplicas.size match {
case 0 =>
throw new StateChangeFailedException("List of reassigned replicas for partition " +
"([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new StateChangeFailedException("None of the reassigned replicas for partition " +
"([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
"%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
@ -123,17 +120,16 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte @@ -123,17 +120,16 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val topicAndPartition = TopicAndPartition(topic, partition)
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
if(currentLeader == preferredReplica) {
throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
.format(preferredReplica, topic, partition))
throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition [%s,%d] is not the preferred replica.".format(currentLeader, topic, partition) +
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
@ -141,7 +137,7 @@ with Logging { @@ -141,7 +137,7 @@ with Logging {
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
"[%s,%d] is either not alive or not in the isr. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
@ -157,13 +153,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) @@ -157,13 +153,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
this.logIdent = "[ControlledShutdownLeaderSelector]: "
def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val currentLeader = currentLeaderAndIsr.leader
val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
@ -172,12 +168,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) @@ -172,12 +168,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
case Some(newLeader) =>
debug("Partition [%s,%d] : current leader = %d, new leader = %d"
.format(topic, partition, currentLeader, newLeader))
debug("Partition %s : current leader = %d, new leader = %d"
.format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}

26
core/src/main/scala/kafka/controller/PartitionStateMachine.scala

@ -130,7 +130,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -130,7 +130,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
assignReplicasToPartitions(topic, partition)
partitionState.put(topicAndPartition, NewPartition)
info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
"%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
partitionState.put(topicAndPartition, OnlinePartition)
// post: partition has a leader
@ -152,18 +152,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -152,18 +152,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// pre: partition should be in Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
info("Partition %s state changed from Online to Offline".format(topicAndPartition))
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader
case NonExistentPartition =>
// pre: partition could be in either of the above states
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper
}
} catch {
case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
case t: Throwable => error("State change for partition %s ".format(topicAndPartition) +
"from %s to %s failed".format(currState, targetState), t)
}
}
@ -266,8 +266,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -266,8 +266,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
*/
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
// handle leader election for the partitions whose leader is no longer alive
info("Electing leader for partition [%s, %d]".format(topic, partition))
info("Electing leader for partition %s".format(topicAndPartition))
try {
var zookeeperPathUpdateSucceeded: Boolean = false
var newLeaderAndIsr: LeaderAndIsr = null
@ -281,7 +282,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -281,7 +282,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
"means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
// elect new leader or throw exception
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
@ -293,15 +294,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -293,15 +294,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// update the leader cache
controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
// store new leader and isr info in cache
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
} catch {
case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
.format(topic, partition) + " Marking this partition offline", poe)
case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition %s are dead."
.format(topicAndPartition) + " Marking this partition offline", poe)
case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
" [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce)
" %s due to: %s.").format(topicAndPartition, sce.getMessage), sce)
}
debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
}
@ -315,11 +316,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -315,11 +316,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
val topicAndPartition = TopicAndPartition(topic, partition)
ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
case None =>
throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
"[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))
"%s in %s state".format(topicAndPartition, partitionState(topicAndPartition)))
}
}

71
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

@ -110,14 +110,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -110,14 +110,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
.format(replicaId, topic, partition) + "state as it is being requested to become leader")
throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
.format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
case None => // new leader request will be sent to this replica when one gets elected
}
replicaState.put((topic, partition, replicaId), NewReplica)
info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
case NonExistentReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
// send stop replica command
@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition,
currentAssignedReplicas.filterNot(_ == replicaId))
info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
replicaState.remove((topic, partition, replicaId))
case OnlineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
@ -135,19 +135,19 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -135,19 +135,19 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// add this replica to the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
case _ =>
// check if the leader for this partition is alive or even exists
controllerContext.allLeaders.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader
if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OnlineReplica)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica"
.format(replicaId, topic, partition))
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
case true => // leader is alive
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OnlineReplica)
info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
case false => // ignore partitions whose leader is not alive
}
case None => // ignore partitions who don't have a leader yet
}
@ -156,30 +156,31 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -156,30 +156,31 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case OfflineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match {
case Some(currLeaderIsrAndControllerEpoch) =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request only to the leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
topic, partition, updatedLeaderIsrAndControllerEpoch,
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OfflineReplica)
info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
false
case None =>
true
}
else false
case None =>
true
}
val leaderAndIsrIsEmpty: Boolean =
controllerContext.allLeaders.get(topicAndPartition) match {
case Some(currLeaderIsrAndControllerEpoch) =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request only to the leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
topic, partition, updatedLeaderIsrAndControllerEpoch,
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OfflineReplica)
info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition))
false
case None =>
true
}
else false
case None =>
true
}
if (leaderAndIsrIsEmpty)
throw new StateChangeFailedException(
"Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty"
.format(replicaId, topic, partition))
"Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
.format(replicaId, topicAndPartition))
}
}
catch {

2
core/src/main/scala/kafka/utils/Logging.scala

@ -28,7 +28,7 @@ trait Logging { @@ -28,7 +28,7 @@ trait Logging {
// Force initialization to register Log4jControllerMBean
private val log4jController = Log4jController
private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
private def msgWithLogIdent(msg: String) = logIdent + msg
def trace(msg: => String): Unit = {
if (logger.isTraceEnabled())

2
core/src/main/scala/kafka/utils/Utils.scala

@ -442,7 +442,7 @@ object Utils extends Logging { @@ -442,7 +442,7 @@ object Utils extends Logging {
* Whitespace surrounding the comma will be removed.
*/
def parseCsvList(csvList: String): Seq[String] = {
if(csvList == null)
if(csvList == null || csvList.isEmpty)
Seq.empty[String]
else {
csvList.split("\\s*,\\s*").filter(v => !v.equals(""))

11
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -95,10 +95,11 @@ object ZkUtils extends Logging { @@ -95,10 +95,11 @@ object ZkUtils extends Logging {
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
val isrString = leaderIsrAndEpochInfo.get("ISR").get
val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@ -201,7 +202,7 @@ object ZkUtils extends Logging { @@ -201,7 +202,7 @@ object ZkUtils extends Logging {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leaderAndIsr.leader.toString)
jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
Utils.stringMapToJson(jsonDataMap)
}

12
core/src/test/scala/unit/kafka/utils/UtilsTest.scala

@ -62,4 +62,16 @@ class UtilsTest extends JUnitSuite { @@ -62,4 +62,16 @@ class UtilsTest extends JUnitSuite {
}
}
@Test
def testCsvList() {
val emptyString:String = ""
val nullString:String = null
val emptyList = Utils.parseCsvList(emptyString)
val emptyListFromNullString = Utils.parseCsvList(nullString)
val emptyStringList = Seq.empty[String]
assertTrue(emptyList!=null)
assertTrue(emptyListFromNullString!=null)
assertTrue(emptyStringList.equals(emptyListFromNullString))
assertTrue(emptyStringList.equals(emptyList))
}
}

Loading…
Cancel
Save