Browse Source

Merge branch '0.8' of https://git-wip-us.apache.org/repos/asf/kafka into 0.8

0.8.0-beta1-candidate1
Joe Stein 12 years ago
parent
commit
d5c980a912
  1. 16
      core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
  2. 3
      core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  3. 2
      perf/src/main/scala/kafka/perf/PerfConfig.scala
  4. 14
      perf/src/main/scala/kafka/perf/ProducerPerformance.scala

16
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

@ -140,14 +140,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// check if the leader for this partition is alive or even exists // check if the leader for this partition is alive or even exists
controllerContext.allLeaders.get(topicAndPartition) match { controllerContext.allLeaders.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) => case Some(leaderIsrAndControllerEpoch) =>
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader
case true => // leader is alive if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch, topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment.size) replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OnlineReplica) replicaState.put((topic, partition, replicaId), OnlineReplica)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) info("Replica %d for partition [%s, %d] state changed to OnlineReplica"
case false => // ignore partitions whose leader is not alive .format(replicaId, topic, partition))
} }
case None => // ignore partitions who don't have a leader yet case None => // ignore partitions who don't have a leader yet
} }

3
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala

@ -228,7 +228,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
*/ */
private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = { private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) { if(brokerId < 0) {
warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) warn("Failed to send data %s since partitions %s don't have a leader".format(messagesPerTopic.map(_._2),
messagesPerTopic.map(_._1.toString).mkString(",")))
messagesPerTopic.keys.toSeq messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) { } else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement val currentCorrelationId = correlationId.getAndIncrement

2
perf/src/main/scala/kafka/perf/PerfConfig.scala

@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
.defaultsTo(200) .defaultsTo(200)
val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg .withRequiredArg
.describedAs("compression codec ") .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
.defaultsTo(0) .defaultsTo(0)
val helpOpt = parser.accepts("help", "Print usage.") val helpOpt = parser.accepts("help", "Print usage.")

14
perf/src/main/scala/kafka/perf/ProducerPerformance.scala

@ -101,11 +101,6 @@ object ProducerPerformance extends Logging {
.describedAs("number of threads") .describedAs("number of threads")
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
.defaultsTo(1) .defaultsTo(1)
val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg
.describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
"ID and sent by producer starting from this ID sequentially. Message content will be String type and " + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
"in the form of 'Message:000...1:xxx...'") "in the form of 'Message:000...1:xxx...'")
@ -117,15 +112,6 @@ object ProducerPerformance extends Logging {
.describedAs("message send time gap") .describedAs("message send time gap")
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
.defaultsTo(0) .defaultsTo(0)
val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(3000)
val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
"to complete")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here") "set, the csv metrics will be outputed here")

Loading…
Cancel
Save