Compare commits

...

8 Commits
trunk ... 0.7.1

Author SHA1 Message Date
Gavin McDonald f9e2a01f48 Kafka has become a TLP 12 years ago
Joe Stein 8460405e4b reverting changes from bad commit on previous release while starting next release 12 years ago
Joe Stein 497f2b7141 0.7.2 Branch of latest trunk 12 years ago
Joe Stein 0ceac63592 KAFKA-368 use the pig core jar from maven instead of distributing it patch by Joe Stein reviewed by Jun Rao and Neha Narkhede 12 years ago
Joe Stein 6cbeab01c2 project version of branch should be 0.7.1 oversight in missing this change when first branching for this release 12 years ago
Joe Stein 09b0abe1ab KAFKA-366 add jmx beans in broker to track # bytes in consumer patch by Jun Rao reviewed by Joel Koshy merged into the 0.7.1 release 12 years ago
Joe Stein e88171ae88 KAFKA-365 change copyright in NOTICE to current year, reviewed by Jun Rao 12 years ago
Joe Stein 0d015f9386 created branch for the 0.7.1 release 12 years ago
  1. 2
      NOTICE
  2. BIN
      contrib/hadoop-consumer/lib/pig-0.8.0-core.jar
  3. BIN
      contrib/hadoop-producer/lib/pig-0.8.0-core.jar
  4. 1
      core/src/main/scala/kafka/consumer/ConsumerIterator.scala
  5. 10
      core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
  6. 2
      core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
  7. 2
      project/build.properties
  8. 6
      project/build/KafkaProject.scala

2
NOTICE

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
Apache Kafka
Copyright 2011 The Apache Software Foundation.
Copyright 2012 The Apache Software Foundation.
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

BIN
contrib/hadoop-consumer/lib/pig-0.8.0-core.jar

Binary file not shown.

BIN
contrib/hadoop-producer/lib/pig-0.8.0-core.jar

Binary file not shown.

1
core/src/main/scala/kafka/consumer/ConsumerIterator.scala

@ -47,6 +47,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], @@ -47,6 +47,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
item
}

10
core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala

@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threadsafe, Logging} @@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threadsafe, Logging}
trait ConsumerTopicStatMBean {
def getMessagesPerTopic: Long
def getBytesPerTopic: Long
}
@threadsafe
class ConsumerTopicStat extends ConsumerTopicStatMBean {
private val numCumulatedMessagesPerTopic = new AtomicLong(0)
private val numCumulatedBytesPerTopic = new AtomicLong(0)
def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
}
object ConsumerTopicStat extends Logging {
private val stats = new Pool[String, ConsumerTopicStat]
private val allTopicStat = new ConsumerTopicStat
Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
var stat = stats.get(topic)

2
core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

@ -61,6 +61,8 @@ private[consumer] class PartitionTopicInfo(val topic: String, @@ -61,6 +61,8 @@ private[consumer] class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
}
size
}

2
project/build.properties

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
#Mon Feb 28 11:55:49 PST 2011
project.name=Kafka
sbt.version=0.7.5
project.version=0.7.0
project.version=0.7.1
build.scala.versions=2.8.0
contrib.root.dir=contrib
lib.dir=lib

6
project/build/KafkaProject.scala

@ -191,6 +191,9 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje @@ -191,6 +191,9 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
<exclude module="junit"/>
</dependency>
<dependency org="org.apache.pig" name="pig" rev="0.8.0">
<exclude module="junit"/>
</dependency>
</dependencies>
}
@ -211,6 +214,9 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje @@ -211,6 +214,9 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
<exclude module="junit"/>
</dependency>
<dependency org="org.apache.pig" name="pig" rev="0.8.0">
<exclude module="junit"/>
</dependency>
</dependencies>
val jodaTime = "joda-time" % "joda-time" % "1.6"

Loading…
Cancel
Save