diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 05e7c6c501c..374cd6b0c85 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -239,10 +239,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { val syncProducer = producerPool.getProducer(brokerId) debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" - .format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) + .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" - .format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) + .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)" .format(response, producerRequest))