|
|
|
@ -228,7 +228,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -228,7 +228,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
|
|
|
|
|
*/ |
|
|
|
|
private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = { |
|
|
|
|
if(brokerId < 0) { |
|
|
|
|
warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) |
|
|
|
|
warn("Failed to send data %s since partitions %s don't have a leader".format(messagesPerTopic.map(_._2), |
|
|
|
|
messagesPerTopic.map(_._1.toString).mkString(","))) |
|
|
|
|
messagesPerTopic.keys.toSeq |
|
|
|
|
} else if(messagesPerTopic.size > 0) { |
|
|
|
|
val currentCorrelationId = correlationId.getAndIncrement |
|
|
|
|