diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index e055d67e3f1..86979630156 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -22,42 +22,15 @@ fi base_dir=$(dirname $0)/.. +SCALA_VERSION=2.8.0 -USER_HOME=$(eval echo ~${USER}) -ivyPath=$(echo "$USER_HOME/.ivy2/cache") - -snappy=$(echo "$ivyPath/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar") -CLASSPATH=$CLASSPATH:$snappy - -library=$(echo "$ivyPath/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar") -CLASSPATH=$CLASSPATH:$library - -compiler=~$(echo "$ivyPath/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar") -CLASSPATH=$CLASSPATH:$compiler - -log4j=$(echo "$ivyPath/log4j/log4j/jars/log4j-1.2.15.jar") -CLASSPATH=$CLASSPATH:$log4j - -slf=$(echo "$ivyPath/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar") -CLASSPATH=$CLASSPATH:$slf - -zookeeper=$(echo "$ivyPath/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar") -CLASSPATH=$CLASSPATH:$zookeeper - -jopt=$(echo "$ivyPath/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar") -CLASSPATH=$CLASSPATH:$jopt - +# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency" for file in $base_dir/core/target/scala-2.8.0/*.jar; do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/core/lib/*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - -for file in $base_dir/perf/target/scala-2.8.0/kafka*.jar; +for file in $base_dir/perf/target/scala-${SCALA_VERSION}/kafka*.jar; do CLASSPATH=$CLASSPATH:$file done diff --git a/core/build.sbt b/core/build.sbt index 211aaf9b7e4..405ea5524e8 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -1,5 +1,6 @@ import sbt._ import Keys._ +import AssemblyKeys._ name := "kafka" @@ -11,8 +12,10 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ ) libraryDependencies ++= Seq( "org.apache.zookeeper" % "zookeeper" % "3.3.4", - "com.github.sgroschupf" % "zkclient" % "0.1", + "com.101tec" % "zkclient" % "0.2", "org.xerial.snappy" % "snappy-java" % "1.0.4.1", + "com.yammer.metrics" % "metrics-core" % "2.2.0", + "com.yammer.metrics" % "metrics-annotation" % "2.2.0", "org.easymock" % "easymock" % "3.0" % "test", "junit" % "junit" % "4.1" % "test" ) @@ -24,4 +27,5 @@ libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => }) } +assemblySettings diff --git a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar deleted file mode 100644 index dba9d2b932f..00000000000 Binary files a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar and /dev/null differ diff --git a/core/lib/metrics-core-3.0.0-c0c8be71.jar b/core/lib/metrics-core-3.0.0-c0c8be71.jar deleted file mode 100644 index 529a69baf19..00000000000 Binary files a/core/lib/metrics-core-3.0.0-c0c8be71.jar and /dev/null differ diff --git a/core/lib/zkclient-20120522.jar b/core/lib/zkclient-20120522.jar deleted file mode 100644 index 225e97e2160..00000000000 Binary files a/core/lib/zkclient-20120522.jar and /dev/null differ diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6146c6e9149..cbac5d08604 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -60,7 +60,7 @@ class Partition(val topic: String, newGauge( topic + "-" + partitionId + "-UnderReplicated", new Gauge[Int] { - def getValue = { + def value = { if (isUnderReplicated) 1 else 0 } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9a5fbfeefdc..398618f8c14 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -650,7 +650,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, newGauge( config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge[Int] { - def getValue = q.size + def value = q.size } ) }) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 65def033e36..02510bdf3ac 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -97,14 +97,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg newGauge( "ActiveControllerCount", new Gauge[Int] { - def getValue() = if (isActive) 1 else 0 + def value() = if (isActive) 1 else 0 } ) newGauge( "OfflinePartitionsCount", new Gauge[Int] { - def getValue: Int = { + def value(): Int = { controllerContext.controllerLock synchronized { controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6ac6545b189..e38b95c0649 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -130,10 +130,10 @@ private[kafka] class Log(val dir: File, info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", - new Gauge[Int] { def getValue = numberOfSegments }) + new Gauge[Int] { def value = numberOfSegments }) newGauge(name + "-" + "LogEndOffset", - new Gauge[Long] { def getValue = logEndOffset }) + new Gauge[Long] { def value = logEndOffset }) /* The name of this log */ def name = dir.getName() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 209fdfa92a5..c0e0dfce960 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe newGauge( "RequestQueueSize", new Gauge[Int] { - def getValue = requestQueue.size + def value = requestQueue.size } ) diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 6691147084c..090400d35e2 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String, newGauge(clientId + "-ProducerQueueSize", new Gauge[Int] { - def getValue = queue.size + def value = queue.size }) override def run { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index cfa7747becc..b6845e468a8 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -201,7 +201,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet newGauge( metricId + "-ConsumerLag", new Gauge[Long] { - def getValue = lagVal.get + def value = lagVal.get } ) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8b0f797fe89..0d39a57ba12 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -57,7 +57,7 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "LeaderCount", new Gauge[Int] { - def getValue = { + def value = { leaderPartitionsLock synchronized { leaderPartitions.size } @@ -67,13 +67,13 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "PartitionCount", new Gauge[Int] { - def getValue = allPartitions.size + def value = allPartitions.size } ) newGauge( "UnderReplicatedPartitions", new Gauge[Int] { - def getValue = { + def value = { leaderPartitionsLock synchronized { leaderPartitions.count(_.isUnderReplicated) } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index afe9e223f95..c064c5c4cf1 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -72,14 +72,14 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge newGauge( "PurgatorySize", new Gauge[Int] { - def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests } ) newGauge( "NumDelayedRequests", new Gauge[Int] { - def getValue = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.unsatisfied.get() } ) diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala index a3f85cf8b1b..fe5bc09e122 100644 --- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite { timer.time { clock.addMillis(1000) } - assertEquals(1, metric.getCount()) - assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon) - assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon) + assertEquals(1, metric.count()) + assertTrue((metric.max() - 1000).abs <= Double.Epsilon) + assertTrue((metric.min() - 1000).abs <= Double.Epsilon) } private class ManualClock extends Clock { private var ticksInNanos = 0L - override def getTick() = { + override def tick() = { ticksInNanos } - override def getTime() = { + override def time() = { TimeUnit.NANOSECONDS.toMillis(ticksInNanos) } diff --git a/project/Build.scala b/project/Build.scala index 4bbdfee904a..3918579f80a 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -17,7 +17,6 @@ import sbt._ import Keys._ -import java.io.File import scala.xml.{Node, Elem} import scala.xml.transform.{RewriteRule, RuleTransformer} @@ -78,17 +77,13 @@ object KafkaBuild extends Build { ) - val coreSettings = Seq( - pomPostProcess := { (pom: Node) => MetricsDepAdder(ZkClientDepAdder(pom)) } - ) - val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka") val runRatTask = runRat := { "bin/run-rat.sh" ! } lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ runRatTask): _*) - lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*).settings(coreSettings: _*) + lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core) @@ -96,48 +91,4 @@ object KafkaBuild extends Build { lazy val hadoopProducer = Project(id = "hadoop-producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) lazy val hadoopConsumer = Project(id = "hadoop-consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) - - // POM Tweaking for core: - def zkClientDep = - - zkclient - zkclient - 20120522 - compile - - - def metricsDeps = - - - com.yammer.metrics - metrics-core - 3.0.0-c0c8be71 - compile - - - com.yammer.metrics - metrics-annotations - 3.0.0-c0c8be71 - compile - - - - object ZkClientDepAdder extends RuleTransformer(new RewriteRule() { - override def transform(node: Node): Seq[Node] = node match { - case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => { - Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*) - } - case other => other - } - }) - - object MetricsDepAdder extends RuleTransformer(new RewriteRule() { - override def transform(node: Node): Seq[Node] = node match { - case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => { - Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*) - } - case other => other - } - }) - } diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index fac723a6f2a..cd406c18868 100644 --- a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -62,52 +62,6 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje - def zkClientDep = - - com.101tec - zkclient - 0.2 - compile - - - def metricsDepsCore = - - com.yammer.metrics - metrics-core - 3.0.0-SNAPSHOT - compile - - - def metricsDepsAnnotations = - - com.yammer.metrics - metrics-annotation - 3.0.0-SNAPSHOT - compile - - - object ZkClientDepAdder extends RuleTransformer(new RewriteRule() { - override def transform(node: Node): Seq[Node] = node match { - case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => { - Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*) - } - case other => other - } - }) - - object MetricsDepAdder extends RuleTransformer(new RewriteRule() { - override def transform(node: Node): Seq[Node] = node match { - case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => { - Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDepsCore ++ metricsDepsAnnotations:_*) - } - case other => other - } - }) - - override def pomPostProcess(pom: Node): Node = { - MetricsDepAdder(ZkClientDepAdder(pom)) - } - override def organization = "org.apache" override def filterScalaJars = false diff --git a/project/plugins.sbt b/project/plugins.sbt index 48d44c81f44..e8c3e53b00e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.8") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")