From 0d6a4dff097d5e46d659db7f7ef922c91ede24e0 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Thu, 23 Aug 2012 17:15:17 +0000 Subject: [PATCH] TopicCount.constructTopicCount isn't thread-safe; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; kafka-379 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1376598 13f79535-47bb-0310-9956-ffa450edef68 --- .../scala/kafka/consumer/TopicCount.scala | 9 +-- core/src/main/scala/kafka/utils/Utils.scala | 21 +++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 59 +++---------------- .../kafka/log4j/KafkaLog4jAppenderTest.scala | 4 +- 4 files changed, 32 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index f3d1e7b5f65..06bce24d662 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -18,11 +18,9 @@ package kafka.consumer import scala.collection._ -import scala.util.parsing.json.JSON import org.I0Itec.zkclient.ZkClient import java.util.regex.Pattern -import kafka.utils.{ZKGroupDirs, ZkUtils, Logging} - +import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging} private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] @@ -59,9 +57,6 @@ private[kafka] object TopicCount extends Logging { private val BLACKLIST_PATTERN = Pattern.compile("""!(\p{Digit}+)!(.*)""") - val myConversionFunc = {input : String => input.toInt} - JSON.globalNumberParser = myConversionFunc - def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient) : TopicCount = { @@ -93,7 +88,7 @@ private[kafka] object TopicCount extends Logging { else { var topMap : Map[String,Int] = null try { - JSON.parseFull(topicCountString) match { + SyncJSON.parseFull(topicCountString) match { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString) } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 28a7d03a048..c007145fb3f 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -32,6 +32,7 @@ import java.util.{Random, Properties} import joptsimple.{OptionSpec, OptionSet, OptionParser} import kafka.common.KafkaException import kafka.cluster.Broker +import util.parsing.json.JSON /** @@ -906,4 +907,24 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) } +} + +/** + * A wrapper that synchronizes JSON in scala, which is not threadsafe. + */ +object SyncJSON extends Logging { + val myConversionFunc = {input : String => input.toInt} + JSON.globalNumberParser = myConversionFunc + val lock = new Object + + def parseFull(input: String): Option[Any] = { + lock synchronized { + try { + JSON.parseFull(input) + } catch { + case t => + throw new KafkaException("Can't parse json string: %s".format(input), t) + } + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d8323b43167..8029c5d5862 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -24,7 +24,6 @@ import org.I0Itec.zkclient.{IZkDataListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import org.I0Itec.zkclient.serialize.ZkSerializer import scala.collection._ -import util.parsing.json.JSON import kafka.api.LeaderAndISR import kafka.common.NoEpochForPartitionException import org.apache.zookeeper.data.Stat @@ -78,7 +77,7 @@ object ZkUtils extends Logging { val stat = ret._2 if(leaderAndISRStr == null) None else { - JSON.parseFull(leaderAndISRStr) match { + SyncJSON.parseFull(leaderAndISRStr) match { case Some(m) => val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt @@ -97,7 +96,7 @@ object ZkUtils extends Logging { val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 if(leaderAndISR == null) None else { - JSON.parseFull(leaderAndISR) match { + SyncJSON.parseFull(leaderAndISR) match { case Some(m) => Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) case None => None @@ -113,7 +112,7 @@ object ZkUtils extends Logging { def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = { val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 if(leaderAndISR != null) { - val epoch = JSON.parseFull(leaderAndISR) match { + val epoch = SyncJSON.parseFull(leaderAndISR) match { case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt @@ -131,7 +130,7 @@ object ZkUtils extends Logging { val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 if(leaderAndISR == null) Seq.empty[Int] else { - JSON.parseFull(leaderAndISR) match { + SyncJSON.parseFull(leaderAndISR) match { case Some(m) => val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get Utils.getCSVList(ISRString).map(r => r.toInt) @@ -148,7 +147,7 @@ object ZkUtils extends Logging { val assignedReplicas = if (jsonPartitionMap == null) { Seq.empty[Int] } else { - JSON.parseFull(jsonPartitionMap) match { + SyncJSON.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { case None => Seq.empty[Int] case Some(seq) => seq.map(_.toInt) @@ -165,27 +164,6 @@ object ZkUtils extends Logging { replicas.contains(brokerId.toString) } - def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = { - // read previous epoch, increment it and write it to the leader path and the ISR path. - val epoch = try { - Some(getEpochForPartition(client, topic, partition)) - }catch { - case e: NoEpochForPartitionException => None - case e1 => throw e1 - } - - val newEpoch = epoch match { - case Some(partitionEpoch) => - debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch)) - partitionEpoch + 1 - case None => - // this is the first time leader is elected for this partition. So set epoch to 1 - debug("First epoch is 1 for topic %s partition %d".format(topic, partition)) - 1 - } - newEpoch - } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, creator, host, port) @@ -424,7 +402,7 @@ object ZkUtils extends Logging { topics.foreach{ topic => val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 if (jsonPartitionMap != null) { - JSON.parseFull(jsonPartitionMap) match { + SyncJSON.parseFull(jsonPartitionMap) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] for((partition, replicas) <- replicaMap){ @@ -458,7 +436,7 @@ object ZkUtils extends Logging { val partitionMap = if (jsonPartitionMap == null) { Map[Int, Seq[Int]]() } else { - JSON.parseFull(jsonPartitionMap) match { + SyncJSON.parseFull(jsonPartitionMap) match { case Some(m) => val m1 = m.asInstanceOf[Map[String, Seq[String]]] m1.map(p => (p._1.toInt, p._2.map(_.toInt))) @@ -552,31 +530,8 @@ object ZkUtils extends Logging { if(topics == null) Seq.empty[String] else topics } - - def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = { - // read previous epoch, increment it and write it to the leader path and the ISR path. - val epoch = try { - Some(getEpochForPartition(client, topic, partition)) - }catch { - case e: NoEpochForPartitionException => None - case e1 => throw e1 - } - val newEpoch = epoch match { - case Some(partitionEpoch) => - debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch)) - partitionEpoch + 1 - case None => - // this is the first time leader is elected for this partition. So set epoch to 1 - debug("First epoch is 1 for topic %s partition %d".format(topic, partition)) - LeaderAndISR.initialLeaderEpoch - } - newEpoch - } } - - - class LeaderExistsOrChangedListener(topic: String, partition: Int, leaderLock: ReentrantLock, diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 2f2fba344c1..39bf5773165 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -140,7 +140,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @Test def testLog4jAppends() { - PropertyConfigurator.configure(getLog4jConfigWithZkConnect) + PropertyConfigurator.configure(getLog4jConfig) for(i <- 1 to 5) info("test") @@ -156,7 +156,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with assertEquals(5, count) } - private def getLog4jConfigWithZkConnect: Properties = { + private def getLog4jConfig: Properties = { var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")