diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 1d5bac4b6b1..264285c871e 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -140,14 +140,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { // check if the leader for this partition is alive or even exists controllerContext.allLeaders.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => - controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { - case true => // leader is alive - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderIsrAndControllerEpoch, - replicaAssignment.size) - replicaState.put((topic, partition, replicaId), OnlineReplica) - info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) - case false => // ignore partitions whose leader is not alive + val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader + if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) { + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), + topic, partition, leaderIsrAndControllerEpoch, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OnlineReplica) + info("Replica %d for partition [%s, %d] state changed to OnlineReplica" + .format(replicaId, topic, partition)) } case None => // ignore partitions who don't have a leader yet } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 9a4e4bc0371..05e7c6c501c 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -228,7 +228,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, */ private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = { if(brokerId < 0) { - warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) + warn("Failed to send data %s since partitions %s don't have a leader".format(messagesPerTopic.map(_._2), + messagesPerTopic.map(_._1.toString).mkString(","))) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { val currentCorrelationId = correlationId.getAndIncrement diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index 5dabeb51edd..a8fc6b9ec81 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { .defaultsTo(200) val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg - .describedAs("compression codec ") + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 4822a7ecd60..507743e6d24 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -101,11 +101,6 @@ object ProducerPerformance extends Logging { .describedAs("number of threads") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + "in the form of 'Message:000...1:xxx...'") @@ -117,15 +112,6 @@ object ProducerPerformance extends Logging { .describedAs("message send time gap") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) - val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + "set, the csv metrics will be outputed here")