|
|
|
@ -239,10 +239,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -239,10 +239,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
|
|
|
|
|
try { |
|
|
|
|
val syncProducer = producerPool.getProducer(brokerId) |
|
|
|
|
debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" |
|
|
|
|
.format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) |
|
|
|
|
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) |
|
|
|
|
val response = syncProducer.send(producerRequest) |
|
|
|
|
debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" |
|
|
|
|
.format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) |
|
|
|
|
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) |
|
|
|
|
if (response.status.size != producerRequest.data.size) |
|
|
|
|
throw new KafkaException("Incomplete response (%s) for producer request (%s)" |
|
|
|
|
.format(response, producerRequest)) |
|
|
|
|