|
|
|
@ -344,7 +344,7 @@ class WorkerSourceTask extends WorkerTask {
@@ -344,7 +344,7 @@ class WorkerSourceTask extends WorkerTask {
|
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value()); |
|
|
|
|
log.trace("{} Appending record to the topic {} with key {}, value {}", this, record.topic(), record.key(), record.value()); |
|
|
|
|
// We need this queued first since the callback could happen immediately (even synchronously in some cases).
|
|
|
|
|
// Because of this we need to be careful about handling retries -- we always save the previously attempted
|
|
|
|
|
// record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding
|
|
|
|
@ -409,6 +409,9 @@ class WorkerSourceTask extends WorkerTask {
@@ -409,6 +409,9 @@ class WorkerSourceTask extends WorkerTask {
|
|
|
|
|
// RegexRouter) topic creation can not be batched for multiple topics
|
|
|
|
|
private void maybeCreateTopic(String topic) { |
|
|
|
|
if (!topicCreation.isTopicCreationRequired(topic)) { |
|
|
|
|
log.trace("Topic creation by the connector is disabled or the topic {} was previously created." + |
|
|
|
|
"If auto.create.topics.enable is enabled on the broker, " + |
|
|
|
|
"the topic will be created with default settings", topic); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
log.info("The task will send records to topic '{}' for the first time. Checking " |
|
|
|
@ -430,7 +433,7 @@ class WorkerSourceTask extends WorkerTask {
@@ -430,7 +433,7 @@ class WorkerSourceTask extends WorkerTask {
|
|
|
|
|
log.info("Created topic '{}' using creation group {}", newTopic, topicGroup); |
|
|
|
|
} else { |
|
|
|
|
log.warn("Request to create new topic '{}' failed", topic); |
|
|
|
|
throw new ConnectException("Task failed to create new topic " + topic + ". Ensure " |
|
|
|
|
throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure " |
|
|
|
|
+ "that the task is authorized to create topics or that the topic exists and " |
|
|
|
|
+ "restart the task"); |
|
|
|
|
} |
|
|
|
|