Browse Source

remove ZK dependency on producer; patched by Yang Ye; reviewed by Jun Rao; KAFKA-369

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1372724 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
ee855fe1aa
  1. 19
      contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
  2. 2
      core/src/main/scala/kafka/api/TopicMetadata.scala
  3. 70
      core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
  4. 10
      core/src/main/scala/kafka/producer/ConsoleProducer.scala
  5. 22
      core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
  6. 9
      core/src/main/scala/kafka/producer/Producer.scala
  7. 33
      core/src/main/scala/kafka/producer/ProducerConfig.scala
  8. 69
      core/src/main/scala/kafka/producer/ProducerPool.scala
  9. 15
      core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  10. 14
      core/src/main/scala/kafka/server/KafkaApis.scala
  11. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala
  12. 7
      core/src/main/scala/kafka/server/KafkaController.scala
  13. 4
      core/src/main/scala/kafka/server/KafkaServer.scala
  14. 2
      core/src/main/scala/kafka/server/KafkaZooKeeper.scala
  15. 14
      core/src/main/scala/kafka/tools/ReplayLogProducer.scala
  16. 15
      core/src/main/scala/kafka/utils/Utils.scala
  17. 13
      core/src/test/scala/other/kafka/TestEndToEndLatency.scala
  18. 16
      core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  19. 3
      core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
  20. 2
      core/src/test/scala/unit/kafka/integration/FetcherTest.scala
  21. 4
      core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  22. 7
      core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
  23. 2
      core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
  24. 14
      core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
  25. 185
      core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  26. 107
      core/src/test/scala/unit/kafka/producer/ProducerTest.scala
  27. 8
      core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
  28. 2
      core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
  29. 4
      core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
  30. 20
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  31. 2
      examples/src/main/java/kafka/examples/Producer.java
  32. 16
      perf/src/main/scala/kafka/perf/ProducerPerformance.scala
  33. 6
      system_test/single_host_multi_brokers/bin/run-test.sh
  34. 13
      system_test/single_host_multi_brokers/config/producer.properties

19
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java

@ -125,22 +125,7 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul @@ -125,22 +125,7 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
props.setProperty("max.message.size", Integer.toString(maxSize));
props.setProperty("compression.codec", Integer.toString(compressionCodec));
if (uri.getScheme().equals("kafka+zk")) {
// Software load balancer:
// URL: kafka+zk://<zk connect path>#<kafka topic>
// e.g. kafka+zk://kafka-zk:2181/kafka#foobar
String zkConnect = uri.getAuthority() + uri.getPath();
props.setProperty("zk.connect", zkConnect);
job.set("kafka.zk.connect", zkConnect);
topic = uri.getFragment();
if (topic == null)
throw new KafkaException("no topic specified in kafka uri fragment");
log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic));
} else if (uri.getScheme().equals("kafka")) {
if (uri.getScheme().equals("kafka")) {
// using the legacy direct broker list
// URL: kafka://<kafka host>/<topic>
// e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
@ -167,7 +152,7 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul @@ -167,7 +152,7 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
job.set("kafka.output.topic", topic);
log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
} else
throw new KafkaException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)");
throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
return new KafkaRecordWriter<W>(producer, topic, queueSize);

2
core/src/main/scala/kafka/api/TopicMetadata.scala

@ -154,7 +154,7 @@ object PartitionMetadata { @@ -154,7 +154,7 @@ object PartitionMetadata {
}
}
case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) {
def sizeInBytes: Int = {
var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/

70
core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala

@ -19,13 +19,17 @@ package kafka.producer @@ -19,13 +19,17 @@ package kafka.producer
import collection.mutable.HashMap
import kafka.api.{TopicMetadataRequest, TopicMetadata}
import kafka.common.KafkaException
import kafka.utils.Logging
import kafka.utils.{Logging, Utils}
import kafka.common.ErrorMapping
import kafka.cluster.{Replica, Partition}
import kafka.common.{LeaderNotAvailableException, ErrorMapping, UnknownTopicException}
class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
val topicPartitionInfo = new HashMap[String, TopicMetadata]()
val zkClient = producerPool.getZkClient
class BrokerPartitionInfo(producerConfig: ProducerConfig,
producerPool: ProducerPool,
topicPartitionInfo: HashMap[String, TopicMetadata])
extends Logging {
val brokerList = producerConfig.brokerList
val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
/**
* Return a sequence of (brokerId, numPartitions).
@ -69,33 +73,43 @@ class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging { @@ -69,33 +73,43 @@ class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
* It updates the cache by issuing a get topic metadata request to a random broker.
* @param topic the topic for which the metadata is to be fetched
*/
def updateInfo(topics: Seq[String] = Seq.empty[String]) = {
val producer = producerPool.getAnyProducer
val topicList = if(topics.size > 0) topics else topicPartitionInfo.keySet.toList
topicList.foreach { topic =>
info("Fetching metadata for topic %s".format(topic))
val topicMetadataRequest = new TopicMetadataRequest(List(topic))
var topicMetaDataResponse: Seq[TopicMetadata] = Nil
def updateInfo(topics: Seq[String]) = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics)
var topicMetaDataResponse: Seq[TopicMetadata] = Nil
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
info("Fetching metadata for topic %s".format(brokers))
try {
topicMetaDataResponse = producer.send(topicMetadataRequest)
// throw topic specific exception
topicMetaDataResponse.foreach(metadata => ErrorMapping.maybeThrowException(metadata.errorCode))
fetchMetaDataSucceeded = true
// throw partition specific exception
topicMetaDataResponse.foreach(metadata =>
metadata.partitionsMetadata.foreach(partitionMetadata => ErrorMapping.maybeThrowException(partitionMetadata.errorCode)))
}catch {
case te: UnknownTopicException => throw te
case e: LeaderNotAvailableException => throw e
case oe => warn("Ignoring non leader related error while fetching metadata", oe) // swallow non leader related errors
}
val topicMetadata:Option[TopicMetadata] = if(topicMetaDataResponse.size > 0) Some(topicMetaDataResponse.head) else None
topicMetadata match {
case Some(metadata) =>
info("Fetched metadata for topics %s".format(topic))
topicMetadata.foreach(metadata => trace("Metadata for topic %s is %s".format(metadata.topic, metadata.toString)))
topicPartitionInfo += (topic -> metadata)
case None =>
topicMetaDataResponse.foreach(tmd =>{
trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
if(tmd.errorCode == ErrorMapping.NoError){
topicPartitionInfo.put(tmd.topic, tmd)
} else
warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError){
debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
}
})
})
producerPool.updateProducer(topicMetaDataResponse)
} catch {
case e =>
warn("fetching broker partition metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
t = e
} finally {
i = i + 1
producer.close()
}
}
if(!fetchMetaDataSucceeded){
throw new KafkaException("fetching broker partition metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
}
}
}

10
core/src/main/scala/kafka/producer/ConsoleProducer.scala

@ -32,9 +32,9 @@ object ConsoleProducer { @@ -32,9 +32,9 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
.describedAs("connection_string")
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
@ -68,7 +68,7 @@ object ConsoleProducer { @@ -68,7 +68,7 @@ object ConsoleProducer {
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, zkConnectOpt)) {
for(arg <- List(topicOpt, brokerListOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
@ -77,7 +77,7 @@ object ConsoleProducer { @@ -77,7 +77,7 @@ object ConsoleProducer {
}
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val brokerList = options.valueOf(brokerListOpt)
val sync = options.has(syncOpt)
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
@ -87,7 +87,7 @@ object ConsoleProducer { @@ -87,7 +87,7 @@ object ConsoleProducer {
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", brokerList)
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))

22
core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala

@ -23,14 +23,10 @@ import org.apache.log4j.AppenderSkeleton @@ -23,14 +23,10 @@ import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.Logging
import java.util.{Properties, Date}
import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var port:Int = 0
var host:String = null
var topic:String = null
var serializerClass:String = null
var zkConnect:String = null
var brokerList:String = null
private var producer: Producer[String, String] = null
@ -38,9 +34,6 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -38,9 +34,6 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
def getTopic:String = topic
def setTopic(topic: String) { this.topic = topic }
def getZkConnect:String = zkConnect
def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
def getBrokerList:String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
@ -48,17 +41,12 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -48,17 +41,12 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
override def activateOptions() {
val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
// check for config parameter validity
val props = new Properties()
if( zkConnect == null) connectDiagnostic += "zkConnect"
else props.put("zk.connect", zkConnect);
if( brokerList == null) connectDiagnostic += "brokerList"
else if( props.isEmpty) props.put("broker.list", brokerList)
if(props.isEmpty )
throw new MissingConfigException(
connectDiagnostic mkString ("One of these connection properties must be specified: ", ", ", ".")
)
if(brokerList != null)
props.put("broker.list", brokerList)
if(props.isEmpty)
throw new MissingConfigException("The broker.list property should be specified")
if(topic == null)
throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
if(serializerClass == null) {
@ -68,7 +56,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -68,7 +56,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
props.put("serializer.class", serializerClass)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
LogLog.debug("Kafka producer connected to " + config.brokerList)
LogLog.debug("Logging for topic: " + topic)
}

9
core/src/main/scala/kafka/producer/Producer.scala

@ -21,15 +21,12 @@ import kafka.utils._ @@ -21,15 +21,12 @@ import kafka.utils._
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder
import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
import org.I0Itec.zkclient.ZkClient
import kafka.common.{QueueFullException, InvalidConfigException}
class Producer[K,V](config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // for testing only
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
if(!Utils.propertyExists(config.zkConnect))
throw new InvalidConfigException("zk.connect property must be specified in the producer")
if (config.batchSize > config.queueSize)
throw new InvalidConfigException("Batch size can't be larger than queue size.")
@ -48,14 +45,12 @@ extends Logging { @@ -48,14 +45,12 @@ extends Logging {
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
def this(config: ProducerConfig, zkClient: ZkClient = null) =
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
Utils.getObject[Partitioner[K]](config.partitionerClass),
Utils.getObject[Encoder[V]](config.serializerClass),
new ProducerPool(config, if(zkClient == null)
new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, ZKStringSerializer) else zkClient)))
new ProducerPool(config)))
/**
* Sends the data, partitioned by key to the topic using either the

33
core/src/main/scala/kafka/producer/ProducerConfig.scala

@ -19,37 +19,24 @@ package kafka.producer @@ -19,37 +19,24 @@ package kafka.producer
import async.AsyncProducerConfig
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.common.InvalidConfigException
import kafka.utils.Utils
class ProducerConfig(val props: Properties) extends ZKConfig(props)
with AsyncProducerConfig with SyncProducerConfigShared{
class ProducerConfig(val props: Properties) extends AsyncProducerConfig with SyncProducerConfigShared{
/** For bypassing zookeeper based auto partition discovery, use this config *
* to pass in static broker and per-broker partition information. Format- *
* brokerid1:host1:port1, brokerid2:host2:port2*/
val brokerList = Utils.getString(props, "broker.list", null)
if(brokerList != null)
throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead")
/** This is for bootstrapping and the producer will only use it for getting metadata
* (topics, partitions and replicas). The socket connections for sending the actual data
* will be established based on the broker information returned in the metadata. The
* format is host1:por1,host2:port2, and the list can be a subset of brokers or
* a VIP pointing to a subset of brokers.
*/
val brokerList = Utils.getString(props, "broker.list")
/**
* If DefaultEventHandler is used, this specifies the number of times to
* retry if an error is encountered during send. Currently, it is only
* appropriate when broker.list points to a VIP. If the zk.connect option
* is used instead, this will not have any effect because with the zk-based
* producer, brokers are not re-selected upon retry. So retries would go to
* the same (potentially still down) broker. (KAFKA-253 will help address
* this.)
* retry if an error is encountered during send.
*/
val numRetries = Utils.getInt(props, "num.retries", 0)
/** If both broker.list and zk.connect options are specified, throw an exception */
if(zkConnect == null)
throw new InvalidConfigException("zk.connect property is required")
if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
/** the partitioner class for partitioning events amongst sub-topics */
val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")

69
core/src/main/scala/kafka/producer/ProducerPool.scala

@ -19,38 +19,49 @@ package kafka.producer @@ -19,38 +19,49 @@ package kafka.producer
import kafka.cluster.Broker
import java.util.Properties
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, Logging}
import collection.mutable.HashMap
import java.lang.Object
import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException}
import kafka.common.UnavailableProducerException
import kafka.utils.{Utils, Logging}
import kafka.api.TopicMetadata
class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()
private def addProducer(broker: Broker) {
object ProducerPool{
def createSyncProducer(config: ProducerConfig): SyncProducer = {
val brokerList = config.brokerList
val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
createSyncProducer(config, brokers.head)
}
def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props)
val producer = new SyncProducer(new SyncProducerConfig(props))
info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
syncProducers.put(broker.id, producer)
new SyncProducer(new SyncProducerConfig(props))
}
}
/**
* For testing purpose
*/
def addProducer(brokerId: Int, syncProducer: SyncProducer) {
syncProducers.put(brokerId, syncProducer)
}
class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()
def addProducers(config: ProducerConfig) {
lock.synchronized {
debug("Connecting to %s for creating sync producers for all brokers in the cluster".format(config.zkConnect))
val brokers = ZkUtils.getAllBrokersInCluster(zkClient)
brokers.foreach(broker => addProducer(broker))
def updateProducer(topicMetaDatas: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[Broker]
topicMetaDatas.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined)
newBrokers+=(pmd.leader.get)
})
})
lock synchronized {
newBrokers.foreach(b => {
if(syncProducers.contains(b.id)){
syncProducers(b.id).close()
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
} else
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
})
}
}
@ -64,21 +75,6 @@ class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends L @@ -64,21 +75,6 @@ class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends L
}
}
def getAnyProducer: SyncProducer = {
lock.synchronized {
if(syncProducers.size == 0) {
// refresh the list of brokers from zookeeper
info("No sync producers available. Refreshing the available broker list from ZK and creating sync producers")
addProducers(config)
if(syncProducers.size == 0)
throw new NoBrokersForPartitionException("No brokers available")
}
syncProducers.head._2
}
}
def getZkClient: ZkClient = zkClient
/**
* Closes all the producers in the pool
*/
@ -88,7 +84,6 @@ class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends L @@ -88,7 +84,6 @@ class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends L
val iter = syncProducers.values.iterator
while(iter.hasNext)
iter.next.close
zkClient.close()
}
}
}

15
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package kafka.producer.async
import kafka.api.{ProducerRequest, TopicData, PartitionData}
import kafka.cluster.Partition
import kafka.common._
import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
@ -26,17 +25,17 @@ import kafka.serializer.Encoder @@ -26,17 +25,17 @@ import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.Map
import scala.collection.mutable.{ListBuffer, HashMap}
import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData}
class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing
private val partitioner: Partitioner[K], // use the other constructor
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner[K],
private val encoder: Encoder[V],
private val producerPool: ProducerPool)
private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging {
val brokerPartitionInfo = new BrokerPartitionInfo(producerPool)
// add producers to the producer pool
producerPool.addProducers(config)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
private val lock = new Object()

14
core/src/main/scala/kafka/server/KafkaApis.scala

@ -151,7 +151,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, @@ -151,7 +151,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled)
requestLogger.trace("producer request %s".format(produceRequest.toString))
trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
trace("Broker %s received produce request %s".format(brokerId, produceRequest.toString))
val response = produceToLocalLog(produceRequest)
debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
@ -329,7 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, @@ -329,7 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
val info = new mutable.ArrayBuffer[PartitionData]()
val topic = offsetDetail.topic
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ){
val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
case Left(err) =>
BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
@ -342,18 +342,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, @@ -342,18 +342,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
case Right(messages) =>
BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))
val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId)
assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(brokerId))
val leaderReplica = leaderReplicaOpt.get
fetchRequest.replicaId match {
case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
case _ => // fetch request from a follower
val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, replicaManager.config.brokerId))
assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId))
val replica = replicaOpt.get
debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
}
}

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -21,6 +21,8 @@ import java.util.Properties @@ -21,6 +21,8 @@ import java.util.Properties
import kafka.utils.{Utils, ZKConfig}
import kafka.message.Message
import kafka.consumer.ConsumerConfig
import java.net.InetAddress
/**
* Configuration settings for the kafka server
@ -30,7 +32,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { @@ -30,7 +32,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
val port: Int = Utils.getInt(props, "port", 6667)
/* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
val hostName: String = Utils.getString(props, "hostname", null)
val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
/* the broker id for this server */
val brokerId: Int = Utils.getInt(props, "brokerid")

7
core/src/main/scala/kafka/server/KafkaController.scala

@ -361,7 +361,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging @@ -361,7 +361,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
allLeaders.put(topicPartition, leaderAndISR.leader)
}
else{
warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment, allBrokerIds))
warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), allBrokerIds))
}
}
@ -378,7 +378,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging @@ -378,7 +378,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
private def onBrokerChange(newBrokers: Set[Int] = null){
/** handle the new brokers, send request for them to initialize the local log **/
if(newBrokers != null)
if(newBrokers != null && newBrokers.size != 0)
deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
/** handle leader election for the partitions whose leader is no longer alive **/
@ -439,13 +439,12 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging @@ -439,13 +439,12 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
}
}
})
trace("after acting on broker change, the broker to leaderAndISR request map is".format(brokerToLeaderAndISRInfosMap))
brokerToLeaderAndISRInfosMap.foreach(m => {
val broker = m._1
val leaderAndISRInfos = m._2
val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
sendRequest(broker, leaderAndISRRequest)
info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(leaderAndISRRequest, broker))
info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(broker, leaderAndISRRequest))
})
}

4
core/src/main/scala/kafka/server/KafkaServer.scala

@ -102,10 +102,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -102,10 +102,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad
/**
* Registers this broker in ZK. After this, consumers can connect to broker.
* So this should happen after socket server start.
*/
// start the replica manager
replicaManager.startup()
// start the controller

2
core/src/main/scala/kafka/server/KafkaZooKeeper.scala

@ -44,7 +44,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { @@ -44,7 +44,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
private def registerBrokerInZk() {
info("Registering broker " + brokerIdPath)
val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
val hostName = config.hostName
val creatorId = hostName + "-" + System.currentTimeMillis
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
}

14
core/src/main/scala/kafka/tools/ReplayLogProducer.scala

@ -72,9 +72,9 @@ object ReplayLogProducer extends Logging { @@ -72,9 +72,9 @@ object ReplayLogProducer extends Logging {
.describedAs("zookeeper url")
.ofType(classOf[String])
.defaultsTo("127.0.0.1:2181")
val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
.withRequiredArg
.describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
.describedAs("hostname:port")
.ofType(classOf[String])
val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.")
.withRequiredArg
@ -117,7 +117,7 @@ object ReplayLogProducer extends Logging { @@ -117,7 +117,7 @@ object ReplayLogProducer extends Logging {
.defaultsTo(0)
val options = parser.parse(args : _*)
for(arg <- List(brokerInfoOpt, inputTopicOpt)) {
for(arg <- List(brokerListOpt, inputTopicOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
@ -125,7 +125,7 @@ object ReplayLogProducer extends Logging { @@ -125,7 +125,7 @@ object ReplayLogProducer extends Logging {
}
}
val zkConnect = options.valueOf(zkConnectOpt)
val brokerInfo = options.valueOf(brokerInfoOpt)
val brokerList = options.valueOf(brokerListOpt)
val numMessages = options.valueOf(numMessagesOpt).intValue
val isAsync = options.has(asyncOpt)
val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
@ -152,11 +152,7 @@ object ReplayLogProducer extends Logging { @@ -152,11 +152,7 @@ object ReplayLogProducer extends Logging {
class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val props = new Properties()
val brokerInfoList = config.brokerInfo.split("=")
if (brokerInfoList(0) == "zk.connect")
props.put("zk.connect", brokerInfoList(1))
else
props.put("broker.list", brokerInfoList(1))
props.put("broker.list", config.brokerList)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
props.put("compression.codec", config.compressionCodec.codec.toString)

15
core/src/main/scala/kafka/utils/Utils.scala

@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient @@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient
import java.util.{Random, Properties}
import joptsimple.{OptionSpec, OptionSet, OptionParser}
import kafka.common.KafkaException
import kafka.cluster.Broker
/**
@ -791,6 +792,20 @@ object Utils extends Logging { @@ -791,6 +792,20 @@ object Utils extends Logging {
builder.toString
}
def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.getCSVList(brokerListStr)
brokersStr.zipWithIndex.map(b =>{
val brokerStr = b._1
val brokerId = b._2
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
val creatorId = hostName + "-" + System.currentTimeMillis()
new Broker(brokerId, creatorId, hostName, port)
})
}
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {

13
core/src/test/scala/other/kafka/TestEndToEndLatency.scala

@ -24,13 +24,14 @@ import kafka.message._ @@ -24,13 +24,14 @@ import kafka.message._
object TestEndToEndLatency {
def main(args: Array[String]) {
if(args.length != 2) {
System.err.println("USAGE: java " + getClass().getName + " zookeeper_connect num_messages")
if(args.length != 3) {
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect num_messages")
System.exit(1)
}
val zkConnect = args(0)
val numMessages = args(1).toInt
val brokerList = args(0)
val zkConnect = args(1)
val numMessages = args(2).toInt
val topic = "test"
val consumerProps = new Properties()
@ -46,7 +47,7 @@ object TestEndToEndLatency { @@ -46,7 +47,7 @@ object TestEndToEndLatency {
val iter = stream.iterator
val producerProps = new Properties()
producerProps.put("zk.connect", zkConnect)
producerProps.put("broker.list", brokerList)
producerProps.put("producer.type", "sync")
val producer = new Producer[Any, Any](new ProducerConfig(producerProps))

16
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

@ -261,9 +261,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -261,9 +261,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
// shutdown one server
servers.last.shutdown
// send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
@ -376,11 +373,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -376,11 +373,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(sentMessages1, receivedMessages1)
}
def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int,
compression: CompressionCodec = NoCompressionCodec): List[Message] = {
def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = {
val header = "test-%d-%d".format(config.brokerId, partition)
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
@ -392,20 +388,20 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -392,20 +388,20 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
ms.toList
}
def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
def sendMessages(config: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
var messages: List[Message] = Nil
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x =>
new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
for (message <- ms)
messages ::= message
producer.send(new ProducerData[Int, Message](topic, partition, ms))
debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, conf.brokerId, topic, partition))
debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
}
producer.close()
messages.reverse

3
core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala

@ -42,7 +42,6 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L @@ -42,7 +42,6 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
override def setUp() {
super.setUp()
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
}
@ -70,7 +69,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L @@ -70,7 +69,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
for(i <- 0 until numMessages)
producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))

2
core/src/test/scala/unit/kafka/integration/FetcherTest.scala

@ -83,7 +83,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -83,7 +83,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
messages += conf.brokerId -> ms
producer.send(new ProducerData[String, Message](topic, topic, ms))

4
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala

@ -104,7 +104,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with @@ -104,7 +104,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val topic = "test-topic"
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
@ -131,7 +131,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with @@ -131,7 +131,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val topic = "test-topic"
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("compression", "true")
val config = new ProducerConfig(props)

7
core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala

@ -20,22 +20,21 @@ package kafka.integration @@ -20,22 +20,21 @@ package kafka.integration
import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import kafka.utils.TestZKUtils
import kafka.producer.{ProducerConfig, Producer}
import kafka.message.Message
import kafka.utils.TestUtils
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
var producer: Producer[String, Message] = null
var consumer: SimpleConsumer = null
override def setUp() {
override def setUp() {
super.setUp
val props = new Properties()
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")

2
core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala

@ -72,7 +72,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -72,7 +72,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
var messages: List[Message] = Nil
val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(zkConnect)
val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
val javaProducer: Producer[Int, Message] = new kafka.javaapi.producer.Producer(producer)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x =>

14
core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

@ -21,7 +21,7 @@ import java.util.Properties @@ -21,7 +21,7 @@ import java.util.Properties
import java.io.File
import kafka.consumer.SimpleConsumer
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
import kafka.utils.{TestUtils, Utils, Logging}
import junit.framework.Assert._
import kafka.api.FetchRequestBuilder
import kafka.message.Message
@ -36,6 +36,7 @@ import org.scalatest.junit.JUnit3Suite @@ -36,6 +36,7 @@ import org.scalatest.junit.JUnit3Suite
class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
var logDirZk: File = null
var config: KafkaConfig = null
var serverZk: KafkaServer = null
var simpleConsumerZk: SimpleConsumer = null
@ -54,7 +55,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -54,7 +55,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
config = new KafkaConfig(propsZk)
serverZk = TestUtils.createServer(config);
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
}
@ -108,7 +110,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -108,7 +110,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// topic missing
@ -124,7 +126,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -124,7 +126,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
@ -137,7 +139,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -137,7 +139,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
}
@Test
def testZkConnectLog4jAppends() {
def testLog4jAppends() {
PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
for(i <- 1 to 5)
@ -160,7 +162,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @@ -160,7 +162,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props

185
core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

@ -25,25 +25,22 @@ import org.junit.Test @@ -25,25 +25,22 @@ import org.junit.Test
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.message.Message
import kafka.producer.async._
import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection.Map
import scala.collection.mutable.ListBuffer
import kafka.utils._
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
class AsyncProducerTest extends JUnit3Suite {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
var brokers: Seq[Broker] = null
override def setUp() {
super.setUp()
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
}
override def tearDown() {
@ -64,7 +61,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -64,7 +61,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("producer.type", "async")
props.put("queue.size", "10")
props.put("batch.size", "1")
@ -88,13 +85,13 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -88,13 +85,13 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
def testProduceAfterClosed() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("producer.type", "async")
props.put("batch.size", "1")
val config = new ProducerConfig(props)
val produceData = getProduceData(10)
val producer = new Producer[String, String](config, zkClient)
val producer = new Producer[String, String](config)
producer.close
try {
@ -167,35 +164,31 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -167,35 +164,31 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val broker1 = new Broker(0, "localhost", "localhost", 9092)
val broker2 = new Broker(1, "localhost", "localhost", 9093)
broker1
// form expected partitions metadata
val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2))
val topic1Metadata = new TopicMetadata("topic1", List(partition1Metadata, partition2Metadata))
val topic2Metadata = new TopicMetadata("topic2", List(partition1Metadata, partition2Metadata))
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
topicPartitionInfos.put("topic2", topic2Metadata)
val intPartitioner = new Partitioner[Int] {
def partition(key: Int, numPartitions: Int): Int = key % numPartitions
}
val config = new ProducerConfig(props)
val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
val producerPool = EasyMock.createMock(classOf[ProducerPool])
producerPool.getZkClient
EasyMock.expectLastCall().andReturn(zkClient)
producerPool.addProducers(config)
EasyMock.expectLastCall()
producerPool.getAnyProducer
EasyMock.expectLastCall().andReturn(syncProducer).times(2)
EasyMock.replay(producerPool)
val producerPool = new ProducerPool(config)
val handler = new DefaultEventHandler[Int,String](config,
partitioner = intPartitioner,
encoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool)
producerPool = producerPool,
topicPartitionInfos)
val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@ -217,25 +210,27 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -217,25 +210,27 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val actualResult = handler.partitionAndCollate(producerDataList)
assertEquals(expectedResult, actualResult)
EasyMock.verify(syncProducer)
EasyMock.verify(producerPool)
}
@Test
def testSerializeEvents() {
val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
// form expected partitions metadata
val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
val producerPool = new ProducerPool(config)
val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
val producerPool = getMockProducerPool(config, syncProducer)
val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder,
producerPool = producerPool)
producerPool = producerPool,
topicPartitionInfos
)
val serializedData = handler.serialize(produceData)
val decoder = new StringDecoder
@ -248,20 +243,22 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -248,20 +243,22 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val producerDataList = new ListBuffer[ProducerData[String,Message]]
producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
val props = new Properties()
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
// form expected partitions metadata
val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
val producerPool = getMockProducerPool(config, syncProducer)
val producerPool = new ProducerPool(config)
val handler = new DefaultEventHandler[String,String](config,
partitioner = new NegativePartitioner,
encoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool)
producerPool = producerPool,
topicPartitionInfos)
try {
handler.partitionAndCollate(producerDataList)
fail("Should fail with InvalidPartitionException")
@ -269,29 +266,29 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -269,29 +266,29 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
catch {
case e: InvalidPartitionException => // expected, do nothing
}
EasyMock.verify(syncProducer)
EasyMock.verify(producerPool)
}
@Test
def testNoBroker() {
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
// create topic metadata with 0 partitions
val topic1Metadata = new TopicMetadata("topic1", Seq.empty)
val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
val producerPool = getMockProducerPool(config, syncProducer)
val producerPool = new ProducerPool(config)
val producerDataList = new ListBuffer[ProducerData[String,String]]
producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder,
producerPool = producerPool)
producerPool = producerPool,
topicPartitionInfos)
try {
handler.handle(producerDataList)
fail("Should fail with NoBrokersForPartitionException")
@ -299,14 +296,12 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -299,14 +296,12 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
catch {
case e: NoBrokersForPartitionException => // expected, do nothing
}
EasyMock.verify(syncProducer)
EasyMock.verify(producerPool)
}
@Test
def testIncompatibleEncoder() {
val props = new Properties()
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
val producer=new Producer[String, String](config)
@ -323,28 +318,23 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -323,28 +318,23 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testRandomPartitioner() {
val props = new Properties()
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
// create topic metadata with 0 partitions
val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092)
val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
val producerPool = EasyMock.createMock(classOf[ProducerPool])
producerPool.getZkClient
EasyMock.expectLastCall().andReturn(zkClient)
producerPool.addProducers(config)
EasyMock.expectLastCall()
producerPool.getAnyProducer
EasyMock.expectLastCall().andReturn(syncProducer).times(2)
EasyMock.replay(producerPool)
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
topicPartitionInfos.put("topic2", topic2Metadata)
val producerPool = new ProducerPool(config)
val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool)
producerPool = producerPool,
topicPartitionInfos)
val producerDataList = new ListBuffer[ProducerData[String,Message]]
producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
@ -360,7 +350,6 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -360,7 +350,6 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
case None =>
fail("Failed to collate requests by topic, partition")
}
EasyMock.verify(producerPool)
}
@Test
@ -369,40 +358,24 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -369,40 +358,24 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")
props.put("batch.size", "5")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
val topic = "topic1"
val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092)
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
val msgs = TestUtils.getMsgStrings(10)
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
EasyMock.expectLastCall().andReturn(List(topic1Metadata))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
EasyMock.replay(mockSyncProducer)
val producerPool = new ProducerPool(config)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
producerPool.getZkClient
EasyMock.expectLastCall().andReturn(zkClient)
producerPool.addProducers(config)
EasyMock.expectLastCall()
producerPool.getAnyProducer
EasyMock.expectLastCall().andReturn(mockSyncProducer)
producerPool.getProducer(0)
EasyMock.expectLastCall().andReturn(mockSyncProducer).times(2)
producerPool.close()
EasyMock.expectLastCall()
EasyMock.replay(producerPool)
val msgs = TestUtils.getMsgStrings(10)
val handler = new DefaultEventHandler[String,String]( config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder,
producerPool = producerPool )
producerPool = producerPool,
topicPartitionInfos)
val producer = new Producer[String, String](config, handler)
try {
@ -413,36 +386,31 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -413,36 +386,31 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
} catch {
case e: Exception => fail("Not expected", e)
}
EasyMock.verify(mockSyncProducer)
EasyMock.verify(producerPool)
}
@Test
def testFailedSendRetryLogic() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
val topic1 = "topic1"
val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
val msgs = TestUtils.getMsgStrings(2)
val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
topicPartitionInfos.put("topic1", topic1Metadata)
// producer used to return topic metadata
val metadataSyncProducer = EasyMock.createMock(classOf[SyncProducer])
metadataSyncProducer.send(new TopicMetadataRequest(List(topic1)))
EasyMock.expectLastCall().andReturn(List(topic1Metadata)).times(3)
EasyMock.replay(metadataSyncProducer)
val msgs = TestUtils.getMsgStrings(2)
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
val response1 =
new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
@ -451,13 +419,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -451,13 +419,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
EasyMock.expect(producerPool.getZkClient).andReturn(zkClient)
EasyMock.expect(producerPool.addProducers(config))
EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
EasyMock.expect(producerPool.close())
EasyMock.replay(producerPool)
@ -465,7 +428,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -465,7 +428,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val handler = new DefaultEventHandler[Int,String](config,
partitioner = new FixedValuePartitioner(),
encoder = new StringEncoder,
producerPool = producerPool)
producerPool = producerPool,
topicPartitionInfos)
try {
val data = List(
new ProducerData[Int,String](topic1, 0, msgs),
@ -477,7 +441,6 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -477,7 +441,6 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
case e: Exception => fail("Not expected", e)
}
EasyMock.verify(metadataSyncProducer)
EasyMock.verify(mockSyncProducer)
EasyMock.verify(producerPool)
}
@ -511,16 +474,13 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -511,16 +474,13 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
def testInvalidConfiguration() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("broker.list", TestZKUtils.zookeeperConnect)
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("producer.type", "async")
try {
new ProducerConfig(props)
fail("should complain about wrong config")
}
catch {
case e: InvalidConfigException => //expected
case e: KafkaException => //expected
}
}
@ -531,33 +491,6 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -531,33 +491,6 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
producerDataList
}
private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
val encoder = new StringEncoder
new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
}
private def getSyncProducer(topic: Seq[String], topicMetadata: Seq[TopicMetadata]): SyncProducer = {
val syncProducer = EasyMock.createMock(classOf[SyncProducer])
topic.zip(topicMetadata).foreach { topicAndMetadata =>
syncProducer.send(new TopicMetadataRequest(List(topicAndMetadata._1)))
EasyMock.expectLastCall().andReturn(List(topicAndMetadata._2))
}
EasyMock.replay(syncProducer)
syncProducer
}
private def getMockProducerPool(config: ProducerConfig, syncProducer: SyncProducer): ProducerPool = {
val producerPool = EasyMock.createMock(classOf[ProducerPool])
producerPool.getZkClient
EasyMock.expectLastCall().andReturn(zkClient)
producerPool.addProducers(config)
EasyMock.expectLastCall()
producerPool.getAnyProducer
EasyMock.expectLastCall().andReturn(syncProducer)
EasyMock.replay(producerPool)
producerPool
}
private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
}

107
core/src/test/scala/unit/kafka/producer/ProducerTest.scala

@ -18,10 +18,6 @@ @@ -18,10 +18,6 @@
package kafka.producer
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import kafka.admin.CreateTopicCommand
import kafka.api.FetchRequestBuilder
import kafka.common.FailedToSendMessageException
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
@ -31,9 +27,13 @@ import org.junit.Assert._ @@ -31,9 +27,13 @@ import org.junit.Assert._
import org.junit.Test
import kafka.utils._
import java.util
import kafka.admin.{AdminUtils, CreateTopicCommand}
import util.Properties
import kafka.api.FetchRequestBuilder
import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
private val brokerId1 = 0
private val brokerId2 = 1
private val ports = TestUtils.choosePorts(2)
@ -44,19 +44,21 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -44,19 +44,21 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
private var consumer2: SimpleConsumer = null
private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
private val config1 = new KafkaConfig(props1) {
override val hostName = "localhost"
override val numPartitions = 4
}
private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
private val config2 = new KafkaConfig(props2) {
override val hostName = "localhost"
override val numPartitions = 4
}
override def setUp() {
super.setUp()
// set up 2 brokers with 4 partitions each
val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
val config1 = new KafkaConfig(props1) {
override val numPartitions = 4
}
server1 = TestUtils.createServer(config1)
val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
val config2 = new KafkaConfig(props2) {
override val numPartitions = 4
}
server2 = TestUtils.createServer(config2)
val props = new Properties()
@ -82,12 +84,62 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -82,12 +84,62 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
super.tearDown()
}
def testUpdateBrokerPartitionInfo() {
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
val props1 = new util.Properties()
props1.put("broker.list", "localhost:80,localhost:81")
props1.put("serializer.class", "kafka.serializer.StringEncoder")
val producerConfig1 = new ProducerConfig(props1)
val producer1 = new Producer[String, String](producerConfig1)
try{
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
fail("Test should fail because the broker list provided are not valid")
} catch {
case e: KafkaException =>
case oe => fail("fails with exception", oe)
} finally {
producer1.close()
}
val props2 = new util.Properties()
props2.put("broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1)))
props2.put("serializer.class", "kafka.serializer.StringEncoder")
val producerConfig2= new ProducerConfig(props2)
val producer2 = new Producer[String, String](producerConfig2)
try{
producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
} catch {
case e => fail("Should succeed sending the message", e)
} finally {
producer2.close()
}
val props3 = new util.Properties()
props3.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
props3.put("serializer.class", "kafka.serializer.StringEncoder")
val producerConfig3 = new ProducerConfig(props3)
val producer3 = new Producer[String, String](producerConfig3)
try{
producer3.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
} catch {
case e => fail("Should succeed sending the message", e)
} finally {
producer3.close()
}
}
@Test
def testZKSendToNewTopic() {
def testSendToNewTopic() {
val props1 = new util.Properties()
props1.put("serializer.class", "kafka.serializer.StringEncoder")
props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
props1.put("zk.connect", TestZKUtils.zookeeperConnect)
props1.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
props1.put("producer.request.required.acks", "2")
props1.put("producer.request.timeout.ms", "1000")
@ -96,15 +148,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -96,15 +148,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
props2.put("producer.request.required.acks", "3")
props2.put("producer.request.timeout.ms", "1000")
val config1 = new ProducerConfig(props1)
val config2 = new ProducerConfig(props2)
val producerConfig1 = new ProducerConfig(props1)
val producerConfig2 = new ProducerConfig(props2)
// create topic with 1 partition and await leadership
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
val producer1 = new Producer[String, String](config1)
val producer2 = new Producer[String, String](config2)
val producer1 = new Producer[String, String](producerConfig1)
val producer2 = new Producer[String, String](producerConfig2)
// Available partition ids should be 0.
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
@ -143,16 +198,19 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -143,16 +198,19 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testZKSendWithDeadBroker() {
def testSendWithDeadBroker() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("producer.request.timeout.ms", "2000")
// props.put("producer.request.required.acks", "-1")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
// create topic
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@ -204,13 +262,16 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -204,13 +262,16 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
// create topics in ZK
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
// do a simple test to make sure plumbing is okay

8
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala

@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@ -80,7 +80,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -80,7 +80,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@ -150,7 +150,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -150,7 +150,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@ -194,7 +194,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -194,7 +194,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))

2
core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala

@ -55,7 +55,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -55,7 +55,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
}
// send test messages to leader
val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder)
producer.send(new ProducerData[String, String](topic1, testMessageList1),
new ProducerData[String, String](topic2, testMessageList2))
producer.close()

4
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala

@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
// send some messages
producer.send(new ProducerData[Int, Message](topic, 0, sent1))
@ -63,7 +63,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -63,7 +63,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
{
val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
val consumer = new SimpleConsumer(host,
port,
1000000,

20
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -33,10 +33,10 @@ import collection.mutable.ListBuffer @@ -33,10 +33,10 @@ import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kafka.serializer.{DefaultEncoder, Encoder}
import kafka.common.ErrorMapping
import kafka.api._
import collection.mutable.{Map, Set}
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
/**
@ -114,12 +114,17 @@ object TestUtils extends Logging { @@ -114,12 +114,17 @@ object TestUtils extends Logging {
yield createBrokerConfig(node, port)
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
configs.map(c => c.hostName + ":" + c.port).mkString(",")
}
/**
* Create a test config for the given node id
*/
def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("brokerid", nodeId.toString)
props.put("hostname", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("log.flush.interval", "1")
@ -283,9 +288,9 @@ object TestUtils extends Logging { @@ -283,9 +288,9 @@ object TestUtils extends Logging {
/**
* Create a producer for the given host and port
*/
def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("broker.list", brokerList)
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
@ -293,11 +298,11 @@ object TestUtils extends Logging { @@ -293,11 +298,11 @@ object TestUtils extends Logging {
new Producer[K, V](new ProducerConfig(props))
}
def getProducerConfig(zkConnect: String, bufferSize: Int, connectTimeout: Int,
def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int,
reconnectInterval: Int): Properties = {
val props = new Properties()
props.put("producer.type", "sync")
props.put("zk.connect", zkConnect)
props.put("broker.list", brokerList)
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
props.put("buffer.size", bufferSize.toString)
props.put("connect.timeout.ms", connectTimeout.toString)
@ -348,6 +353,11 @@ object TestUtils extends Logging { @@ -348,6 +353,11 @@ object TestUtils extends Logging {
produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
}
def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
val encoder = new StringEncoder
new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
}
def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
}

2
examples/src/main/java/kafka/examples/Producer.java

@ -30,7 +30,7 @@ public class Producer extends Thread @@ -30,7 +30,7 @@ public class Producer extends Thread
public Producer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", "localhost:2181");
props.put("broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));

16
perf/src/main/scala/kafka/perf/ProducerPerformance.scala

@ -71,9 +71,9 @@ object ProducerPerformance extends Logging { @@ -71,9 +71,9 @@ object ProducerPerformance extends Logging {
}
class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
.withRequiredArg
.describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
.describedAs("hostname:port")
.ofType(classOf[String])
val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
.withRequiredArg()
@ -115,7 +115,7 @@ object ProducerPerformance extends Logging { @@ -115,7 +115,7 @@ object ProducerPerformance extends Logging {
.defaultsTo(0)
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
@ -128,7 +128,7 @@ object ProducerPerformance extends Logging { @@ -128,7 +128,7 @@ object ProducerPerformance extends Logging {
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val brokerInfo = options.valueOf(brokerInfoOpt)
val brokerList = options.valueOf(brokerListOpt)
val messageSize = options.valueOf(messageSizeOpt).intValue
var isFixSize = !options.has(varyMessageSizeOpt)
var isAsync = options.has(asyncOpt)
@ -170,13 +170,7 @@ object ProducerPerformance extends Logging { @@ -170,13 +170,7 @@ object ProducerPerformance extends Logging {
val allDone: CountDownLatch,
val rand: Random) extends Runnable {
val props = new Properties()
val brokerInfoList = config.brokerInfo.split("=")
if (brokerInfoList(0) == "zk.connect") {
props.put("zk.connect", brokerInfoList(1))
props.put("zk.sessiontimeout.ms", "300000")
}
else
props.put("broker.list", brokerInfoList(1))
props.put("broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

6
system_test/single_host_multi_brokers/bin/run-test.sh

@ -231,14 +231,14 @@ start_servers_cluster() { @@ -231,14 +231,14 @@ start_servers_cluster() {
start_producer_perf() {
this_topic=$1
zk_conn_str=$2
broker_list_str=$2
no_msg_to_produce=$3
init_msg_id=$4
info "starting producer performance"
${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \
--brokerinfo "zk.connect=${zk_conn_str}" \
--broker-list ${broker_list_str} \
--topic ${this_topic} \
--messages $no_msg_to_produce \
--message-size 100 \
@ -501,7 +501,7 @@ start_test() { @@ -501,7 +501,7 @@ start_test() {
fi
init_id=$(( ($i - 1) * $producer_msg_batch_size ))
start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
start_producer_perf $test_topic localhost:9091,localhost:9092,localhost:9093 $producer_msg_batch_size $init_id
info "sleeping for 15s"
sleep 15
echo

13
system_test/single_host_multi_brokers/config/producer.properties

@ -16,20 +16,9 @@ @@ -16,20 +16,9 @@
############################# Producer Basics #############################
# need to set either broker.list or zk.connect
# configure brokers statically
# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
#broker.list=0:localhost:9092
# discover brokers from ZK
zk.connect=localhost:2181
# zookeeper session timeout; default is 6000
#zk.sessiontimeout.ms=
# the max time that the client waits to establish a connection to zookeeper; default is 6000
#zk.connectiontimeout.ms
broker.list=localhost:9091,localhost:9092,localhost:9093
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

Loading…
Cancel
Save