From 0f6cc0a058bbd2736f1e638448fc64b58a9f6b41 Mon Sep 17 00:00:00 2001 From: Eric Olander Date: Sun, 25 Jan 2015 19:27:54 -0800 Subject: [PATCH] KAFKA-1818 KAFKA-1818 clean up code to more idiomatic scala usage; reviewed by Neha Narkhede and Gwen Shapira --- .../scala/kafka/utils/ReplicationUtils.scala | 32 +++++++------------ .../kafka/utils/ReplicationUtilsTest.scala | 10 ++++++ 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 715767380f7..60687332b4c 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -61,30 +61,22 @@ object ReplicationUtils extends Logging { def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition) - val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath) - val leaderAndIsrOpt = leaderAndIsrInfo._1 - val stat = leaderAndIsrInfo._2 - leaderAndIsrOpt match { - case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat) - case None => None - } + val (leaderAndIsrOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath) + leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat)) } private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat) : Option[LeaderIsrAndControllerEpoch] = { - Json.parseFull(leaderAndIsrStr) match { - case Some(m) => - val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] - val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] - val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] - val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] - val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] - val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch, - isr.toString(), zkPathVersion, path)) - Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) - case None => None - } + Json.parseFull(leaderAndIsrStr).flatMap {m => + val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] + val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] + val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] + val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] + val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] + val zkPathVersion = stat.getVersion + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch, + isr.toString(), zkPathVersion, path)) + Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))} } } diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index 84e08557de5..305498adf41 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -17,6 +17,7 @@ package kafka.utils +import kafka.controller.LeaderIsrAndControllerEpoch import kafka.server.{ReplicaFetcherManager, KafkaConfig} import kafka.api.LeaderAndIsr import kafka.zk.ZooKeeperTestHarness @@ -42,6 +43,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2))) + val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch) + override def setUp() { super.setUp() @@ -92,4 +95,11 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(newZkVersion3,-1) } + @Test + def testGetLeaderIsrAndEpochForPartition() { + val leaderIsrAndControllerEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId) + assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get) + assertEquals(None, ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId + 1)) + } + }