Browse Source

KAFKA-1818 KAFKA-1818 clean up code to more idiomatic scala usage; reviewed by Neha Narkhede and Gwen Shapira

pull/1442/head
Eric Olander 10 years ago committed by Neha Narkhede
parent
commit
0f6cc0a058
  1. 32
      core/src/main/scala/kafka/utils/ReplicationUtils.scala
  2. 10
      core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala

32
core/src/main/scala/kafka/utils/ReplicationUtils.scala

@ -61,30 +61,22 @@ object ReplicationUtils extends Logging { @@ -61,30 +61,22 @@ object ReplicationUtils extends Logging {
def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition)
val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath)
val leaderAndIsrOpt = leaderAndIsrInfo._1
val stat = leaderAndIsrInfo._2
leaderAndIsrOpt match {
case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat)
case None => None
}
val (leaderAndIsrOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath)
leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
}
private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch,
isr.toString(), zkPathVersion, path))
Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))
case None => None
}
Json.parseFull(leaderAndIsrStr).flatMap {m =>
val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch,
isr.toString(), zkPathVersion, path))
Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
}
}

10
core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.utils
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.server.{ReplicaFetcherManager, KafkaConfig}
import kafka.api.LeaderAndIsr
import kafka.zk.ZooKeeperTestHarness
@ -42,6 +43,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -42,6 +43,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
"versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
override def setUp() {
super.setUp()
@ -92,4 +95,11 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -92,4 +95,11 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(newZkVersion3,-1)
}
@Test
def testGetLeaderIsrAndEpochForPartition() {
val leaderIsrAndControllerEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId)
assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get)
assertEquals(None, ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId + 1))
}
}

Loading…
Cancel
Save