Browse Source

TopicCount.constructTopicCount isn't thread-safe; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; kafka-379

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1376598 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
0d6a4dff09
  1. 9
      core/src/main/scala/kafka/consumer/TopicCount.scala
  2. 21
      core/src/main/scala/kafka/utils/Utils.scala
  3. 59
      core/src/main/scala/kafka/utils/ZkUtils.scala
  4. 4
      core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

9
core/src/main/scala/kafka/consumer/TopicCount.scala

@ -18,11 +18,9 @@ @@ -18,11 +18,9 @@
package kafka.consumer
import scala.collection._
import scala.util.parsing.json.JSON
import org.I0Itec.zkclient.ZkClient
import java.util.regex.Pattern
import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
@ -59,9 +57,6 @@ private[kafka] object TopicCount extends Logging { @@ -59,9 +57,6 @@ private[kafka] object TopicCount extends Logging {
private val BLACKLIST_PATTERN =
Pattern.compile("""!(\p{Digit}+)!(.*)""")
val myConversionFunc = {input : String => input.toInt}
JSON.globalNumberParser = myConversionFunc
def constructTopicCount(group: String,
consumerId: String,
zkClient: ZkClient) : TopicCount = {
@ -93,7 +88,7 @@ private[kafka] object TopicCount extends Logging { @@ -93,7 +88,7 @@ private[kafka] object TopicCount extends Logging {
else {
var topMap : Map[String,Int] = null
try {
JSON.parseFull(topicCountString) match {
SyncJSON.parseFull(topicCountString) match {
case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
}

21
core/src/main/scala/kafka/utils/Utils.scala

@ -32,6 +32,7 @@ import java.util.{Random, Properties} @@ -32,6 +32,7 @@ import java.util.{Random, Properties}
import joptsimple.{OptionSpec, OptionSet, OptionParser}
import kafka.common.KafkaException
import kafka.cluster.Broker
import util.parsing.json.JSON
/**
@ -906,4 +907,24 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * @@ -906,4 +907,24 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L *
def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
}
}
/**
* A wrapper that synchronizes JSON in scala, which is not threadsafe.
*/
object SyncJSON extends Logging {
val myConversionFunc = {input : String => input.toInt}
JSON.globalNumberParser = myConversionFunc
val lock = new Object
def parseFull(input: String): Option[Any] = {
lock synchronized {
try {
JSON.parseFull(input)
} catch {
case t =>
throw new KafkaException("Can't parse json string: %s".format(input), t)
}
}
}
}

59
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -24,7 +24,6 @@ import org.I0Itec.zkclient.{IZkDataListener, ZkClient} @@ -24,7 +24,6 @@ import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
import util.parsing.json.JSON
import kafka.api.LeaderAndISR
import kafka.common.NoEpochForPartitionException
import org.apache.zookeeper.data.Stat
@ -78,7 +77,7 @@ object ZkUtils extends Logging { @@ -78,7 +77,7 @@ object ZkUtils extends Logging {
val stat = ret._2
if(leaderAndISRStr == null) None
else {
JSON.parseFull(leaderAndISRStr) match {
SyncJSON.parseFull(leaderAndISRStr) match {
case Some(m) =>
val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
@ -97,7 +96,7 @@ object ZkUtils extends Logging { @@ -97,7 +96,7 @@ object ZkUtils extends Logging {
val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
if(leaderAndISR == null) None
else {
JSON.parseFull(leaderAndISR) match {
SyncJSON.parseFull(leaderAndISR) match {
case Some(m) =>
Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
case None => None
@ -113,7 +112,7 @@ object ZkUtils extends Logging { @@ -113,7 +112,7 @@ object ZkUtils extends Logging {
def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
if(leaderAndISR != null) {
val epoch = JSON.parseFull(leaderAndISR) match {
val epoch = SyncJSON.parseFull(leaderAndISR) match {
case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
case Some(m) =>
m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
@ -131,7 +130,7 @@ object ZkUtils extends Logging { @@ -131,7 +130,7 @@ object ZkUtils extends Logging {
val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
if(leaderAndISR == null) Seq.empty[Int]
else {
JSON.parseFull(leaderAndISR) match {
SyncJSON.parseFull(leaderAndISR) match {
case Some(m) =>
val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
Utils.getCSVList(ISRString).map(r => r.toInt)
@ -148,7 +147,7 @@ object ZkUtils extends Logging { @@ -148,7 +147,7 @@ object ZkUtils extends Logging {
val assignedReplicas = if (jsonPartitionMap == null) {
Seq.empty[Int]
} else {
JSON.parseFull(jsonPartitionMap) match {
SyncJSON.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
case None => Seq.empty[Int]
case Some(seq) => seq.map(_.toInt)
@ -165,27 +164,6 @@ object ZkUtils extends Logging { @@ -165,27 +164,6 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
// read previous epoch, increment it and write it to the leader path and the ISR path.
val epoch = try {
Some(getEpochForPartition(client, topic, partition))
}catch {
case e: NoEpochForPartitionException => None
case e1 => throw e1
}
val newEpoch = epoch match {
case Some(partitionEpoch) =>
debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
partitionEpoch + 1
case None =>
// this is the first time leader is elected for this partition. So set epoch to 1
debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
1
}
newEpoch
}
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, creator, host, port)
@ -424,7 +402,7 @@ object ZkUtils extends Logging { @@ -424,7 +402,7 @@ object ZkUtils extends Logging {
topics.foreach{ topic =>
val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
if (jsonPartitionMap != null) {
JSON.parseFull(jsonPartitionMap) match {
SyncJSON.parseFull(jsonPartitionMap) match {
case Some(m) =>
val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
for((partition, replicas) <- replicaMap){
@ -458,7 +436,7 @@ object ZkUtils extends Logging { @@ -458,7 +436,7 @@ object ZkUtils extends Logging {
val partitionMap = if (jsonPartitionMap == null) {
Map[Int, Seq[Int]]()
} else {
JSON.parseFull(jsonPartitionMap) match {
SyncJSON.parseFull(jsonPartitionMap) match {
case Some(m) =>
val m1 = m.asInstanceOf[Map[String, Seq[String]]]
m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
@ -552,31 +530,8 @@ object ZkUtils extends Logging { @@ -552,31 +530,8 @@ object ZkUtils extends Logging {
if(topics == null) Seq.empty[String]
else topics
}
def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
// read previous epoch, increment it and write it to the leader path and the ISR path.
val epoch = try {
Some(getEpochForPartition(client, topic, partition))
}catch {
case e: NoEpochForPartitionException => None
case e1 => throw e1
}
val newEpoch = epoch match {
case Some(partitionEpoch) =>
debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
partitionEpoch + 1
case None =>
// this is the first time leader is elected for this partition. So set epoch to 1
debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
LeaderAndISR.initialLeaderEpoch
}
newEpoch
}
}
class LeaderExistsOrChangedListener(topic: String,
partition: Int,
leaderLock: ReentrantLock,

4
core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

@ -140,7 +140,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -140,7 +140,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
@Test
def testLog4jAppends() {
PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
PropertyConfigurator.configure(getLog4jConfig)
for(i <- 1 to 5)
info("test")
@ -156,7 +156,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -156,7 +156,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
assertEquals(5, count)
}
private def getLog4jConfigWithZkConnect: Properties = {
private def getLog4jConfig: Properties = {
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")

Loading…
Cancel
Save