diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 1febd7fa760..180a6bba3a7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -344,7 +344,7 @@ class WorkerSourceTask extends WorkerTask { continue; } - log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value()); + log.trace("{} Appending record to the topic {} with key {}, value {}", this, record.topic(), record.key(), record.value()); // We need this queued first since the callback could happen immediately (even synchronously in some cases). // Because of this we need to be careful about handling retries -- we always save the previously attempted // record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding @@ -409,6 +409,9 @@ class WorkerSourceTask extends WorkerTask { // RegexRouter) topic creation can not be batched for multiple topics private void maybeCreateTopic(String topic) { if (!topicCreation.isTopicCreationRequired(topic)) { + log.trace("Topic creation by the connector is disabled or the topic {} was previously created." + + "If auto.create.topics.enable is enabled on the broker, " + + "the topic will be created with default settings", topic); return; } log.info("The task will send records to topic '{}' for the first time. Checking " @@ -430,7 +433,7 @@ class WorkerSourceTask extends WorkerTask { log.info("Created topic '{}' using creation group {}", newTopic, topicGroup); } else { log.warn("Request to create new topic '{}' failed", topic); - throw new ConnectException("Task failed to create new topic " + topic + ". Ensure " + throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure " + "that the task is authorized to create topics or that the topic exists and " + "restart the task"); }