Browse Source

draft patch

0.8.0-beta1-candidate1
Neha Narkhede 12 years ago
parent
commit
aa1546b090
  1. 3
      core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
  2. 9
      core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  3. 11
      core/src/main/scala/kafka/utils/ZkUtils.scala

3
core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala

@ -177,7 +177,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) @@ -177,7 +177,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
throw new StateChangeFailedException(("No other replicas in ISR %s for [%s,%d] besides current leader %d and" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topic, partition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}

9
core/src/main/scala/kafka/controller/PartitionStateMachine.scala

@ -151,6 +151,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -151,6 +151,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case OfflinePartition =>
// pre: partition should be in Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
// mark the partition offline by setting the leader to -1
// read the current leader and isr path
val leaderIsrAndControllerEpoch = controller.controllerContext.allLeaders(topicAndPartition)
leaderIsrAndControllerEpoch.leaderAndIsr.leader = -1
leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch += 1
leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion += 1
ZkUtils.updatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// should be called when the leader for a partition is no longer alive
info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
partitionState.put(topicAndPartition, OfflinePartition)

11
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -95,10 +95,11 @@ object ZkUtils extends Logging { @@ -95,10 +95,11 @@ object ZkUtils extends Logging {
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
val isrString = leaderIsrAndEpochInfo.get("ISR").get
val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@ -201,7 +202,7 @@ object ZkUtils extends Logging { @@ -201,7 +202,7 @@ object ZkUtils extends Logging {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leaderAndIsr.leader.toString)
jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
Utils.stringMapToJson(jsonDataMap)
}

Loading…
Cancel
Save