Browse Source

Consumer needs a pluggable decoder; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-3

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1176671 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Jun Rao 13 years ago
parent
commit
4092ceeb5a
  1. 2
      core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
  2. 5
      core/src/main/scala/kafka/consumer/ConsumerConnector.scala
  3. 23
      core/src/main/scala/kafka/consumer/ConsumerIterator.scala
  4. 13
      core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
  5. 17
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  6. 10
      core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
  7. 20
      core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
  8. 7
      core/src/main/scala/kafka/tools/ConsumerShell.scala
  9. 2
      core/src/main/scala/kafka/tools/ReplayLogProducer.scala
  10. 3
      core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
  11. 58
      core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  12. 56
      core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
  13. 20
      examples/src/main/java/kafka/examples/Consumer.java
  14. 2
      project/build/KafkaProject.scala

2
core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

@ -119,7 +119,7 @@ object ConsoleConsumer { @@ -119,7 +119,7 @@ object ConsoleConsumer {
}
})
var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
val iter =
if(maxMessages >= 0)
stream.slice(0, maxMessages)

5
core/src/main/scala/kafka/consumer/ConsumerConnector.scala

@ -20,6 +20,7 @@ package kafka.consumer @@ -20,6 +20,7 @@ package kafka.consumer
import scala.collection._
import kafka.utils.Utils
import org.apache.log4j.Logger
import kafka.serializer.{DefaultDecoder, Decoder}
/**
* Main interface for consumer
@ -32,7 +33,9 @@ trait ConsumerConnector { @@ -32,7 +33,9 @@ trait ConsumerConnector {
* @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
* list is #streams. Each KafkaMessageStream supports an iterator of messages.
*/
def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]]
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T] = new DefaultDecoder)
: Map[String,List[KafkaMessageStream[T]]]
/**
* Commit the offsets of all broker partitions connected by this connector.

23
core/src/main/scala/kafka/consumer/ConsumerIterator.scala

@ -22,32 +22,35 @@ import org.apache.log4j.Logger @@ -22,32 +22,35 @@ import org.apache.log4j.Logger
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.cluster.Partition
import kafka.message.{MessageAndOffset, MessageSet, Message}
import kafka.serializer.Decoder
/**
* An iterator that blocks until a value can be read from the supplied queue.
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
extends IteratorTemplate[Message] {
class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T])
extends IteratorTemplate[T] {
private val logger = Logger.getLogger(classOf[ConsumerIterator])
private val logger = Logger.getLogger(classOf[ConsumerIterator[T]])
private var current: Iterator[MessageAndOffset] = null
private var currentDataChunk: FetchedDataChunk = null
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
override def next(): Message = {
val message = super.next
override def next(): T = {
val decodedMessage = super.next()
if(consumedOffset < 0)
throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
if(logger.isTraceEnabled)
logger.trace("Setting consumed offset to %d".format(consumedOffset))
message
decodedMessage
}
protected def makeNext(): Message = {
protected def makeNext(): T = {
// if we don't have an iterator, get one
if(current == null || !current.hasNext) {
if (consumerTimeoutMs < 0)
@ -62,7 +65,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con @@ -62,7 +65,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
if(logger.isDebugEnabled)
logger.debug("Received the shutdown command")
channel.offer(currentDataChunk)
return allDone
return allDone()
} else {
currentTopicInfo = currentDataChunk.topicInfo
if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
@ -73,9 +76,9 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con @@ -73,9 +76,9 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
current = currentDataChunk.messages.iterator
}
}
val item = current.next
val item = current.next()
consumedOffset = item.offset
item.message
decoder.toEvent(item.message)
}
}

13
core/src/main/scala/kafka/consumer/KafkaMessageStream.scala

@ -20,20 +20,23 @@ package kafka.consumer @@ -20,20 +20,23 @@ package kafka.consumer
import java.util.concurrent.BlockingQueue
import org.apache.log4j.Logger
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
/**
* All calls to elements should produce the same thread-safe iterator? Should have a seperate thread
* that feeds messages into a blocking queue for processing.
*/
class KafkaMessageStream(private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
extends Iterable[Message] with java.lang.Iterable[Message]{
class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T])
extends Iterable[T] with java.lang.Iterable[T]{
private val logger = Logger.getLogger(getClass())
private val iter: ConsumerIterator = new ConsumerIterator(queue, consumerTimeoutMs)
private val iter: ConsumerIterator[T] =
new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
/**
* Create an iterator over messages in the stream.
*/
def iterator(): ConsumerIterator = iter
def iterator(): ConsumerIterator[T] = iter
}

17
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -29,6 +29,7 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} @@ -29,6 +29,7 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.api.OffsetRequest
import java.util.UUID
import kafka.serializer.Decoder
/**
* This class handles the consumers interaction with zookeeper
@ -103,8 +104,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -103,8 +104,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def this(config: ConsumerConfig) = this(config, true)
def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] = {
consume(topicCountMap)
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T])
: Map[String,List[KafkaMessageStream[T]]] = {
consume(topicCountMap, decoder)
}
private def createFetcher() {
@ -143,13 +146,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -143,13 +146,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]] = {
def consume[T](topicCountMap: scala.collection.Map[String,Int],
decoder: Decoder[T])
: Map[String,List[KafkaMessageStream[T]]] = {
logger.debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
val dirs = new ZKGroupDirs(config.groupId)
var ret = new mutable.HashMap[String,List[KafkaMessageStream]]
var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
var consumerUuid : String = null
config.consumerId match {
@ -177,11 +182,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -177,11 +182,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// create a queue per topic per consumer thread
val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
var streamList: List[KafkaMessageStream] = Nil
var streamList: List[KafkaMessageStream[T]] = Nil
for (threadId <- threadIdSet) {
val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
queues.put((topic, threadId), stream)
streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs)
streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder)
}
ret += (topic -> streamList)
logger.debug("adding topic " + topic + " and stream to map..")

10
core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java

@ -18,19 +18,25 @@ @@ -18,19 +18,25 @@
package kafka.javaapi.consumer;
import kafka.consumer.KafkaMessageStream;
import kafka.message.Message;
import kafka.serializer.Decoder;
import java.util.List;
import java.util.Map;
public interface ConsumerConnector {
/**
* Create a list of MessageStreams for each topic.
* Create a list of MessageStreams of type T for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param decoder a decoder that converts from Message to T
* @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
* list is #streams. Each KafkaMessageStream supports an iterator of messages.
*/
public Map<String, List<KafkaMessageStream>> createMessageStreams(Map<String, Integer> topicCountMap);
public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
Map<String, Integer> topicCountMap, Decoder<T> decoder);
public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
Map<String, Integer> topicCountMap);
/**
* Commit the offsets of all broker partitions connected by this connector.

20
core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package kafka.javaapi.consumer
import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
/**
* This class handles the consumers interaction with zookeeper
@ -63,15 +65,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -63,15 +65,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def this(config: ConsumerConfig) = this(config, true)
// for java client
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]):
java.util.Map[String,java.util.List[KafkaMessageStream]] = {
def createMessageStreams[T](
topicCountMap: java.util.Map[String,java.lang.Integer],
decoder: Decoder[T])
: java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = {
import scala.collection.JavaConversions._
val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
val scalaReturn = underlying.consume(scalaTopicCountMap)
val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]]
val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]]
for ((topic, streams) <- scalaReturn) {
var javaStreamList = new java.util.ArrayList[KafkaMessageStream]
var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)
@ -79,6 +83,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -79,6 +83,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
ret
}
def createMessageStreams(
topicCountMap: java.util.Map[String,java.lang.Integer])
: java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
createMessageStreams(topicCountMap, new DefaultDecoder)
def commitOffsets() {
underlying.commitOffsets
}

7
core/src/main/scala/kafka/tools/ConsumerShell.scala

@ -22,6 +22,7 @@ import kafka.utils.Utils @@ -22,6 +22,7 @@ import kafka.utils.Utils
import java.util.concurrent.CountDownLatch
import org.apache.log4j.Logger
import kafka.consumer._
import kafka.serializer.StringDecoder
/**
* Program to read using the rich consumer and dump the results to standard out
@ -63,7 +64,7 @@ object ConsumerShell { @@ -63,7 +64,7 @@ object ConsumerShell {
val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions))
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions), new StringDecoder)
var threadList = List[ZKConsumerThread]()
for ((topic, streamList) <- topicMessageStreams)
for (stream <- streamList)
@ -83,7 +84,7 @@ object ConsumerShell { @@ -83,7 +84,7 @@ object ConsumerShell {
}
}
class ZKConsumerThread(stream: KafkaMessageStream) extends Thread {
class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread {
val shutdownLatch = new CountDownLatch(1)
val logger = Logger.getLogger(getClass)
@ -92,7 +93,7 @@ class ZKConsumerThread(stream: KafkaMessageStream) extends Thread { @@ -92,7 +93,7 @@ class ZKConsumerThread(stream: KafkaMessageStream) extends Thread {
var count: Int = 0
try {
for (message <- stream) {
println("consumed: " + Utils.toString(message.payload, "UTF-8"))
println("consumed: " + message)
count += 1
}
}catch {

2
core/src/main/scala/kafka/tools/ReplayLogProducer.scala

@ -139,7 +139,7 @@ object ReplayLogProducer { @@ -139,7 +139,7 @@ object ReplayLogProducer {
}
}
class ZKConsumerThread(config: Config, stream: KafkaMessageStream) extends Thread {
class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread {
val shutdownLatch = new CountDownLatch(1)
val logger = Logger.getLogger(getClass)
val props = new Properties()

3
core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package kafka
import consumer._
import message.Message
import utils.Utils
import java.util.concurrent.CountDownLatch
@ -55,7 +56,7 @@ object TestZKConsumerOffsets { @@ -55,7 +56,7 @@ object TestZKConsumerOffsets {
}
}
private class ConsumerThread(stream: KafkaMessageStream) extends Thread {
private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread {
val shutdownLatch = new CountDownLatch(1)
override def run() {

58
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

@ -27,6 +27,7 @@ import kafka.utils.{TestZKUtils, TestUtils} @@ -27,6 +27,7 @@ import kafka.utils.{TestZKUtils, TestUtils}
import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer.StringDecoder
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
private val logger = Logger.getLogger(getClass())
@ -124,26 +125,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -124,26 +125,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil
// test consumer timeout logic
val consumerConfig0 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
override val consumerTimeoutMs = 200
}
val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
try {
getMessages(nMessages*2, topicMessageStreams0)
fail("should get an exception")
}
catch {
case e: ConsumerTimeoutException => // this is ok
println("This is ok")
case e => throw e
}
zkConsumerConnector0.shutdown
println("Sending messages for 1st consumer")
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
@ -227,6 +208,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -227,6 +208,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.ERROR)
}
def testConsumerDecoder() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
requestHandlerLogger.setLevel(Level.FATAL)
val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
map(m => Utils.toString(m.payload, "UTF-8")).
sortWith((s, t) => s.compare(t) == -1)
val consumerConfig = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
val topicMessageStreams =
zkConsumerConnector.createMessageStreams(
Predef.Map(topic -> numNodes*numParts/2), new StringDecoder)
var receivedMessages: List[String] = Nil
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
for (i <- 0 until nMessages * 2) {
assertTrue(iterator.hasNext())
val message = iterator.next()
receivedMessages ::= message
logger.debug("received message: " + message)
}
}
}
receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
assertEquals(sentMessages, receivedMessages)
zkConsumerConnector.shutdown()
requestHandlerLogger.setLevel(Level.ERROR)
}
def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec): List[Message]= {
var messages: List[Message] = Nil
val producer = TestUtils.createProducer("localhost", conf.port)
@ -250,7 +266,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -250,7 +266,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
messages.sortWith((s,t) => s.checksum < t.checksum)
}
def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream]]): List[Message]= {
def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
var messages: List[Message] = Nil
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {

56
core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala

@ -31,6 +31,7 @@ import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTim @@ -31,6 +31,7 @@ import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTim
import javax.management.NotCompliantMBeanException
import org.apache.log4j.{Level, Logger}
import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message}
import kafka.serializer.StringDecoder
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
private val logger = Logger.getLogger(getClass())
@ -125,24 +126,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -125,24 +126,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testCompression() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil
// test consumer timeout logic
val consumerConfig0 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
override val consumerTimeoutMs = 200
}
val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
try {
getMessages(nMessages*2, topicMessageStreams0)
fail("should get an exception")
}
catch {
case e: ConsumerTimeoutException => // this is ok
case e => throw e
}
zkConsumerConnector0.shutdown
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
@ -224,6 +207,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -224,6 +207,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.ERROR)
}
def testConsumerDecoder() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
requestHandlerLogger.setLevel(Level.FATAL)
val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
map(m => Utils.toString(m.payload, "UTF-8")).
sortWith((s, t) => s.compare(t) == -1)
val consumerConfig = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
val topicMessageStreams = zkConsumerConnector.createMessageStreams(
Predef.Map(topic -> new java.lang.Integer(numNodes * numParts / 2)), new StringDecoder)
var receivedMessages: List[String] = Nil
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
for (i <- 0 until nMessages * 2) {
assertTrue(iterator.hasNext())
val message = iterator.next()
receivedMessages ::= message
logger.debug("received message: " + message)
}
}
}
receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
assertEquals(sentMessages, receivedMessages)
zkConsumerConnector.shutdown()
requestHandlerLogger.setLevel(Level.ERROR)
}
def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
var messages: List[Message] = Nil
val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port))
@ -247,7 +265,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -247,7 +265,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
messages.sortWith((s,t) => s.checksum < t.checksum)
}
def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream]])
def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]])
: List[Message]= {
var messages: List[Message] = Nil
val topicMessageStreams = asMap(jTopicMessageStreams)

20
examples/src/main/java/kafka/examples/Consumer.java

@ -16,15 +16,16 @@ @@ -16,15 +16,16 @@
*/
package kafka.examples;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class Consumer extends Thread
{
@ -33,7 +34,8 @@ public class Consumer extends Thread @@ -33,7 +34,8 @@ public class Consumer extends Thread
public Consumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
@ -53,9 +55,9 @@ public class Consumer extends Thread @@ -53,9 +55,9 @@ public class Consumer extends Thread
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaMessageStream<Message> stream = consumerMap.get(topic).get(0);
ConsumerIterator<Message> it = stream.iterator();
while(it.hasNext())
System.out.println(ExampleUtils.getMessage(it.next()));
}

2
project/build/KafkaProject.scala

@ -129,6 +129,8 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje @@ -129,6 +129,8 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
override def artifactID = "kafka-java-examples"
override def filterScalaJars = false
override def javaCompileOptions = super.javaCompileOptions ++
List(JavaCompileOption("-Xlint:unchecked"))
}
class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)

Loading…
Cancel
Save