diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 99af002cb46..6955433d90d 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -48,7 +48,7 @@ object PartitionStateInfo { val leader = buffer.getInt val leaderEpoch = buffer.getInt val isrString = readShortString(buffer) - val isr = isrString.split(",").map(_.toInt).toList + val isr = Utils.parseCsvList(isrString).map(_.toInt).toList val zkVersion = buffer.getInt val replicationFactor = buffer.getInt PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch), diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 565c53a7e2b..f1a12c01ad2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -671,7 +671,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg "means the current controller with epoch %d went through a soft failure and another ".format(epoch) + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) if (leaderAndIsr.isr.contains(replicaId)) { - val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, + // if the replica to be removed from the ISR is also the leader, set the new leader value to -1 + val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader + val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( @@ -683,8 +685,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) if (updateSucceeded) - info("New leader and ISR for partition [%s, %d] is %s" - .format(topic, partition, newLeaderAndIsr.toString())) + info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString())) updateSucceeded } else { warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" @@ -721,7 +722,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg partitionStateMachine.shutdown() replicaStateMachine.shutdown() if(controllerContext.controllerChannelManager != null) { - info("session expires, clean up the state") controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } @@ -766,13 +766,11 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL assignedReplicasOpt match { case Some(assignedReplicas) => if(assignedReplicas == newReplicas) { - throw new KafkaException("Partition %s to be reassigned is already assigned to replicas" - .format(topicAndPartition) + + throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) } else { if(aliveNewReplicas == newReplicas) { - info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, - newReplicas.mkString(","))) + info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) val context = createReassignmentContextForPartition(topic, partition, newReplicas) controllerContext.partitionsBeingReassigned.put(topicAndPartition, context) controller.onPartitionReassignment(topicAndPartition, context) @@ -851,18 +849,18 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet if(caughtUpReplicas == reassignedReplicas) { // resume the partition reassignment process - info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned." - .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) + + info("%d/%d replicas have caught up with the leader for partition %s being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + "Resuming partition reassignment") controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } else { - info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned." - .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) + + info("%d/%d replicas have caught up with the leader for partition %s being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) } - case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created" - .format(topic, partition, reassignedReplicas.mkString(","))) + case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" + .format(topicAndPartition, reassignedReplicas.mkString(","))) } case None => } diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 3eb23cd7d67..3ed9b7e49a4 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -23,15 +23,14 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff trait PartitionLeaderSelector { /** - * @param topic The topic of the partition whose leader needs to be elected - * @param partition The partition whose leader needs to be elected + * @param topicAndPartition The topic and partition whose leader needs to be elected * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper * @throws PartitionOfflineException If no replica in the assigned replicas list is alive * @return The leader and isr request, with the newly selected leader info, to send to the brokers * Also, returns the list of replicas the returned leader and isr request should be sent to * This API selects a new leader for the input partition */ - def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) } @@ -45,8 +44,8 @@ trait PartitionLeaderSelector { class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " - def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { - controllerContext.partitionReplicaAssignment.get(TopicAndPartition(topic, partition)) match { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) @@ -60,7 +59,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten case true => ControllerStats.offlinePartitionRate.mark() throw new PartitionOfflineException(("No replica for partition " + - "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " Assigned replicas are: [%s]".format(assignedReplicas)) case false => ControllerStats.uncleanLeaderElectionRate.mark() @@ -74,13 +73,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) } - info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(), topic, - partition)) + info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicasToThisPartition) case None => ControllerStats.offlinePartitionRate.mark() - throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) + - "replicas assigned to it") + throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") } } } @@ -91,8 +88,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ReassignedPartitionLeaderSelector]: " - def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { - val reassignedReplicas = controllerContext.partitionsBeingReassigned(TopicAndPartition(topic, partition)).newReplicas + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion // pick any replica from the newly assigned replicas list that is in the ISR @@ -105,10 +102,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex reassignedReplicas.size match { case 0 => throw new StateChangeFailedException("List of reassigned replicas for partition " + - "([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr)) + " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) case _ => throw new StateChangeFailedException("None of the reassigned replicas for partition " + - "([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr)) + "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } @@ -123,17 +120,16 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte with Logging { this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " - def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { - val topicAndPartition = TopicAndPartition(topic, partition) + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader if(currentLeader == preferredReplica) { - throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]" - .format(preferredReplica, topic, partition)) + throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s" + .format(preferredReplica, topicAndPartition)) } else { - info("Current leader %d for partition [%s,%d] is not the preferred replica.".format(currentLeader, topic, partition) + + info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Trigerring preferred replica leader election") // check if preferred replica is not the current leader and is alive and in the isr if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { @@ -141,7 +137,7 @@ with Logging { currentLeaderAndIsr.zkVersion + 1), assignedReplicas) } else { throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + - "[%s,%d] is either not alive or not in the isr. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr)) + "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } @@ -157,13 +153,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) this.logIdent = "[ControlledShutdownLeaderSelector]: " - def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val currentLeader = currentLeaderAndIsr.leader - val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)) + val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) @@ -172,12 +168,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val newLeaderOpt = newIsr.headOption newLeaderOpt match { case Some(newLeader) => - debug("Partition [%s,%d] : current leader = %d, new leader = %d" - .format(topic, partition, currentLeader, newLeader)) + debug("Partition %s : current leader = %d, new leader = %d" + .format(topicAndPartition, currentLeader, newLeader)) (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => - throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition)) + throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" + + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(","))) } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 372793bd8fe..9bb318cb456 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -130,7 +130,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) assignReplicasToPartitions(topic, partition) partitionState.put(topicAndPartition, NewPartition) - info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) + + info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) + "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","))) case OnlinePartition => assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) @@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { electLeaderForPartition(topic, partition, leaderSelector) case _ => // should never come here since illegal previous states are checked above } - info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition, + info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition, partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader)) partitionState.put(topicAndPartition, OnlinePartition) // post: partition has a leader @@ -152,18 +152,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // pre: partition should be in Online state assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition) // should be called when the leader for a partition is no longer alive - info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition)) + info("Partition %s state changed from Online to Offline".format(topicAndPartition)) partitionState.put(topicAndPartition, OfflinePartition) // post: partition has no alive leader case NonExistentPartition => // pre: partition could be in either of the above states assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) - info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition)) + info("Partition %s state changed from Offline to NotExists".format(topicAndPartition)) partitionState.put(topicAndPartition, NonExistentPartition) // post: partition state is deleted from all brokers and zookeeper } } catch { - case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) + + case t: Throwable => error("State change for partition %s ".format(topicAndPartition) + "from %s to %s failed".format(currState, targetState), t) } } @@ -266,8 +266,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.) */ def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { + val topicAndPartition = TopicAndPartition(topic, partition) // handle leader election for the partitions whose leader is no longer alive - info("Electing leader for partition [%s, %d]".format(topic, partition)) + info("Electing leader for partition %s".format(topicAndPartition)) try { var zookeeperPathUpdateSucceeded: Boolean = false var newLeaderAndIsr: LeaderAndIsr = null @@ -281,7 +282,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) // elect new leader or throw exception - val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr) + val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion) @@ -293,15 +294,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch) // update the leader cache controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) - info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) + info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition)) // store new leader and isr info in cache brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) } catch { - case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." - .format(topic, partition) + " Marking this partition offline", poe) + case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition %s are dead." + .format(topicAndPartition) + " Marking this partition offline", poe) case sce => throw new StateChangeFailedException(("Error while electing leader for partition " + - " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce) + " %s due to: %s.").format(topicAndPartition, sce.getMessage), sce) } debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2)))) } @@ -315,11 +316,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { + val topicAndPartition = TopicAndPartition(topic, partition) ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch case None => throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " + - "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition)))) + "%s in %s state".format(topicAndPartition, partitionState(topicAndPartition))) } } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 264285c871e..20d9c4fd9ed 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -110,14 +110,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) - throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica" - .format(replicaId, topic, partition) + "state as it is being requested to become leader") + throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" + .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size) case None => // new leader request will be sent to this replica when one gets elected } replicaState.put((topic, partition, replicaId), NewReplica) - info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition)) + info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition)) case NonExistentReplica => assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState) // send stop replica command @@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) - info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition)) + info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition)) replicaState.remove((topic, partition, replicaId)) case OnlineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState) @@ -135,19 +135,19 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { // add this replica to the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) - info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) + info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition)) case _ => // check if the leader for this partition is alive or even exists controllerContext.allLeaders.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => - val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader - if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) { - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderIsrAndControllerEpoch, - replicaAssignment.size) - replicaState.put((topic, partition, replicaId), OnlineReplica) - info("Replica %d for partition [%s, %d] state changed to OnlineReplica" - .format(replicaId, topic, partition)) + controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { + case true => // leader is alive + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), + topic, partition, leaderIsrAndControllerEpoch, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OnlineReplica) + info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition)) + case false => // ignore partitions whose leader is not alive } case None => // ignore partitions who don't have a leader yet } @@ -156,30 +156,31 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) // As an optimization, the controller removes dead replicas from the ISR - val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match { - case Some(currLeaderIsrAndControllerEpoch) => - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) - controller.removeReplicaFromIsr(topic, partition, replicaId) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), - topic, partition, updatedLeaderIsrAndControllerEpoch, - replicaAssignment.size) - replicaState.put((topic, partition, replicaId), OfflineReplica) - info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition)) - info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition)) - false - case None => - true - } - else false - case None => - true - } + val leaderAndIsrIsEmpty: Boolean = + controllerContext.allLeaders.get(topicAndPartition) match { + case Some(currLeaderIsrAndControllerEpoch) => + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) + controller.removeReplicaFromIsr(topic, partition, replicaId) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + topic, partition, updatedLeaderIsrAndControllerEpoch, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OfflineReplica) + info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition)) + info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition)) + false + case None => + true + } + else false + case None => + true + } if (leaderAndIsrIsEmpty) throw new StateChangeFailedException( - "Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty" - .format(replicaId, topic, partition)) + "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" + .format(replicaId, topicAndPartition)) } } catch { diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index d36fe23f7a9..d9f010bfe7a 100644 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -28,7 +28,7 @@ trait Logging { // Force initialization to register Log4jControllerMBean private val log4jController = Log4jController - private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg) + private def msgWithLogIdent(msg: String) = logIdent + msg def trace(msg: => String): Unit = { if (logger.isTraceEnabled()) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index c83752ff898..0185c14bd60 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -442,7 +442,7 @@ object Utils extends Logging { * Whitespace surrounding the comma will be removed. */ def parseCsvList(csvList: String): Seq[String] = { - if(csvList == null) + if(csvList == null || csvList.isEmpty) Seq.empty[String] else { csvList.split("\\s*,\\s*").filter(v => !v.equals("")) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f59440449ec..113ad37a3ce 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -95,10 +95,11 @@ object ZkUtils extends Logging { : Option[LeaderIsrAndControllerEpoch] = { Json.parseFull(leaderAndIsrStr) match { case Some(m) => - val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt - val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt - val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get - val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt + val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]] + val leader = leaderIsrAndEpochInfo.get("leader").get.toInt + val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt + val isrString = leaderIsrAndEpochInfo.get("ISR").get + val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt val isr = Utils.parseCsvList(isrString).map(r => r.toInt) val zkPathVersion = stat.getVersion debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, @@ -201,7 +202,7 @@ object ZkUtils extends Logging { val jsonDataMap = new HashMap[String, String] jsonDataMap.put("leader", leaderAndIsr.leader.toString) jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString) - jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(",")) + jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(",")) jsonDataMap.put("controllerEpoch", controllerEpoch.toString) Utils.stringMapToJson(jsonDataMap) } diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index a4d3a271718..cce6c8ea29f 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -62,4 +62,16 @@ class UtilsTest extends JUnitSuite { } } + @Test + def testCsvList() { + val emptyString:String = "" + val nullString:String = null + val emptyList = Utils.parseCsvList(emptyString) + val emptyListFromNullString = Utils.parseCsvList(nullString) + val emptyStringList = Seq.empty[String] + assertTrue(emptyList!=null) + assertTrue(emptyListFromNullString!=null) + assertTrue(emptyStringList.equals(emptyListFromNullString)) + assertTrue(emptyStringList.equals(emptyList)) + } }