Browse Source

KAFKA-1416; Unify sendMessages in TestUtils; reviewed by Guozhang Wang

pull/56/head
Flutra Osmani 10 years ago committed by Guozhang Wang
parent
commit
9e5d481c7c
  1. 58
      core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  2. 19
      core/src/test/scala/unit/kafka/integration/FetcherTest.scala
  3. 45
      core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
  4. 54
      core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
  5. 4
      core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
  6. 122
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

58
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

@ -79,7 +79,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -79,7 +79,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// also the iterator should support re-entrant, so loop it twice
for (i <- 0 until 2) {
try {
getMessages(nMessages*2, topicMessageStreams0)
getMessages(topicMessageStreams0, nMessages * 2)
fail("should get an exception")
} catch {
case e: ConsumerTimeoutException => // this is ok
@ -90,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -90,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown
// send some messages to each broker
val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
sendMessagesToPartition(servers, topic, 1, nMessages)
val sentMessages1 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2)
assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
// also check partition ownership
@ -124,13 +124,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -124,13 +124,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
sendMessagesToPartition(servers, topic, 1, nMessages)
val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
// also check partition ownership
@ -145,13 +145,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -145,13 +145,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
sendMessagesToPartition(servers, topic, 1, nMessages)
val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
// also check partition ownership
@ -179,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -179,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -193,7 +193,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -193,7 +193,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2)
assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
// also check partition ownership
@ -212,13 +212,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -212,13 +212,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
// also check partition ownership
@ -233,13 +233,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -233,13 +233,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
// also check partition ownership
@ -255,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -255,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testCompressionSetConsumption() {
// send some messages to each broker
val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec)
val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
sendMessages(servers, topic, 200, 1, DefaultCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@ -264,7 +264,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -264,7 +264,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages = getMessages(400, topicMessageStreams1)
val receivedMessages = getMessages(topicMessageStreams1, 400)
assertEquals(sentMessages.sorted, receivedMessages.sorted)
// also check partition ownership
@ -281,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -281,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec)
val sentMessages = sendMessages(servers, topic, nMessages, 0, NoCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, NoCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@ -322,7 +322,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -322,7 +322,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// send some messages to each broker
val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
val sentMessages1 = sendMessages(servers, topic, nMessages)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -340,7 +340,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -340,7 +340,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val expected_1 = List( ("0", "group1_consumer1-0"))
assertEquals(expected_1, actual_1)
val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
assertEquals(sentMessages1, receivedMessages1)
zkConsumerConnector1.shutdown()
zkClient.close()
@ -348,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -348,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testConsumerRebalanceListener() {
// Send messages to create topic
sendMessagesToPartition(servers, topic, 0, nMessages)
sendMessagesToPartition(servers, topic, 1, nMessages)
sendMessages(servers, topic, nMessages, 0)
sendMessages(servers, topic, nMessages, 1)
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@ -385,7 +385,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -385,7 +385,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// Consume messages from consumer 1 to make sure it has finished rebalance
getMessages(nMessages, topicMessageStreams1)
getMessages(topicMessageStreams1, nMessages)
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List(("0", "group1_consumer1-0"),

19
core/src/test/scala/unit/kafka/integration/FetcherTest.scala

@ -66,32 +66,17 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -66,32 +66,17 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def testFetcher() {
val perNode = 2
var count = sendMessages(perNode)
var count = TestUtils.sendMessages(servers, topic, perNode).size
fetch(count)
assertQueueEmpty()
count = sendMessages(perNode)
count = TestUtils.sendMessages(servers, topic, perNode).size
fetch(count)
assertQueueEmpty()
}
def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
messages += conf.brokerId -> ms
producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
producer.close()
count += ms.size
}
count
}
def fetch(expected: Int) {
var count = 0
while(true) {

45
core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala

@ -29,7 +29,7 @@ import kafka.admin.AdminUtils @@ -29,7 +29,7 @@ import kafka.admin.AdminUtils
import kafka.common.FailedToSendMessageException
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
import kafka.producer.{KeyedMessage, Producer}
import kafka.serializer.StringEncoder
import kafka.serializer.StringDecoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils
import kafka.utils.TestUtils._
@ -175,14 +175,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -175,14 +175,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
debug("Follower for " + topic + " is: %s".format(followerId))
produceMessage(topic, "first")
sendMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
produceMessage(topic, "second")
sendMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))
// shutdown leader and then restart follower
@ -192,7 +192,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -192,7 +192,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
// wait until new leader is (uncleanly) elected
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
produceMessage(topic, "third")
sendMessage(servers, topic, "third")
// second message was lost due to unclean election
assertEquals(List("first", "third"), consumeAllMessages(topic))
@ -210,14 +210,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -210,14 +210,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
debug("Follower for " + topic + " is: %s".format(followerId))
produceMessage(topic, "first")
sendMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
produceMessage(topic, "second")
sendMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))
// shutdown leader and then restart follower
@ -229,7 +229,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -229,7 +229,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
// message production and consumption should both fail while leader is down
intercept[FailedToSendMessageException] {
produceMessage(topic, "third")
sendMessage(servers, topic, "third")
}
assertEquals(List.empty[String], consumeAllMessages(topic))
@ -237,7 +237,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -237,7 +237,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
produceMessage(topic, "third")
sendMessage(servers, topic, "third")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
@ -253,33 +253,16 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -253,33 +253,16 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
server.awaitShutdown()
}
private def produceMessage(topic: String, message: String) = {
val producer: Producer[String, Array[Byte]] = createProducer(
getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
producer.close()
}
private def consumeAllMessages(topic: String) : List[String] = {
// use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or
// resetting the ZK offset
val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000)
val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
val messages = new MutableList[String]
val iter = messageStream.iterator
try {
while(iter.hasNext()) {
messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there
}
} catch {
case e: ConsumerTimeoutException =>
debug("consumer timed out after receiving " + messages.length + " message(s).")
} finally {
consumerConnector.shutdown
}
messages.toList
val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val messages = getMessages(messageStream)
consumerConnector.shutdown
messages
}
}

54
core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala

@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
val sentMessages1 = sendMessages(servers, nMessages, "batch1")
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -82,32 +82,24 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -82,32 +82,24 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.ERROR)
}
def sendMessages(conf: KafkaConfig,
def sendMessages(servers: Seq[KafkaServer],
messagesPerNode: Int,
header: String,
compressed: CompressionCodec): List[String] = {
header: String): List[String] = {
var messages: List[String] = Nil
val producer: kafka.producer.Producer[Int, String] =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
messages ++= ms
import JavaConversions._
javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
for(server <- servers) {
val producer: kafka.producer.Producer[Int, String] =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x)
messages ++= ms
import JavaConversions._
javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
}
javaProducer.close
}
javaProducer.close
messages
}
def sendMessages(messagesPerNode: Int,
header: String,
compressed: CompressionCodec = NoCompressionCodec): List[String] = {
var messages: List[String] = Nil
for(conf <- configs)
messages ++= sendMessages(conf, messagesPerNode, header, compressed)
messages
}
@ -115,18 +107,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -115,18 +107,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
var messages: List[String] = Nil
import scala.collection.JavaConversions._
val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
for (i <- 0 until nMessagesPerThread) {
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + message)
}
}
}
val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList)
messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread)
messages
}

4
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

@ -77,12 +77,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -77,12 +77,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
}
def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
val sentMessages1 = sendMessages(servers, topic, nMessages)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
zkConsumerConnector1.shutdown()
}

122
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -36,7 +36,7 @@ import kafka.producer._ @@ -36,7 +36,7 @@ import kafka.producer._
import kafka.message._
import kafka.api._
import kafka.cluster.Broker
import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig}
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
@ -745,71 +745,97 @@ object TestUtils extends Logging { @@ -745,71 +745,97 @@ object TestUtils extends Logging {
time = time,
brokerState = new BrokerState())
}
def sendMessagesToPartition(servers: Seq[KafkaServer],
topic: String,
partition: Int,
numMessages: Int,
compression: CompressionCodec = NoCompressionCodec): List[String] = {
def sendMessages(servers: Seq[KafkaServer],
topic: String,
numMessages: Int,
partition: Int = -1,
compression: CompressionCodec = NoCompressionCodec): List[String] = {
val header = "test-%d".format(partition)
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, String] =
createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
val ms = 0.until(numMessages).map(x => header + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
producer.close()
ms.toList
}
def sendMessages(servers: Seq[KafkaServer],
topic: String,
producerId: String,
messagesPerNode: Int,
header: String,
compression: CompressionCodec,
numParts: Int): List[String]= {
var messages: List[String] = Nil
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
props.put("client.id", producerId)
val producer: Producer[Int, String] =
createProducer(brokerList = TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
// Specific Partition
if (partition >= 0) {
val producer: Producer[Int, String] =
createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
messages ++= ms
debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
producer.close()
ms.toList
} else {
// Use topic as the key to determine partition
val producer: Producer[String, String] = createProducer(
TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[DefaultPartitioner].getName,
producerProps = props)
producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
producer.close()
debug("Sent %d messages for topic [%s]".format(ms.size, topic))
ms.toList
}
}
def sendMessage(servers: Seq[KafkaServer],
topic: String,
message: String) = {
val producer: Producer[String, String] =
createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName(),
keyEncoder = classOf[StringEncoder].getName())
producer.send(new KeyedMessage[String, String](topic, topic, message))
producer.close()
messages
}
def getMessages(nMessagesPerThread: Int,
topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = {
/**
* Consume all messages (or a specific number of messages)
* @param topicMessageStreams the Topic Message Streams
* @param nMessagesPerThread an optional field to specify the exact number of messages to be returned.
* ConsumerTimeoutException will be thrown if there are no messages to be consumed.
* If not specified, then all available messages will be consumed, and no exception is thrown.
*
*
* @return the list of messages consumed.
*/
def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
nMessagesPerThread: Int = -1): List[String] = {
var messages: List[String] = Nil
val shouldGetAllMessages = nMessagesPerThread < 0
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
for (i <- 0 until nMessagesPerThread) {
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + message)
val iterator = messageStream.iterator()
try {
var i = 0
while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread)) {
assertTrue(iterator.hasNext)
val message = iterator.next.message // will throw a timeout exception if the message isn't there
messages ::= message
debug("received message: " + message)
i += 1
}
} catch {
case e: ConsumerTimeoutException =>
if (shouldGetAllMessages) {
// swallow the exception
debug("consumer timed out after receiving " + messages.length + " message(s).")
} else {
throw e
}
}
}
}
messages.reverse
}

Loading…
Cancel
Save