diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index fd90dd149ba..52b6f41cc77 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -545,8 +545,8 @@ public class Worker { producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the - // worker, but this may compromise the delivery guarantees of Kafka Connect. + // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker, + // but this may compromise the delivery guarantees of Kafka Connect. producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 6e1152be911..9ca79753a47 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * WorkerTask that uses a SourceTask to ingest data into Kafka. @@ -78,6 +79,7 @@ class WorkerSourceTask extends WorkerTask { private final OffsetStorageWriter offsetWriter; private final Time time; private final SourceTaskMetricsGroup sourceTaskMetricsGroup; + private final AtomicReference producerSendException; private List toSend; private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator @@ -133,6 +135,7 @@ class WorkerSourceTask extends WorkerTask { this.flushing = false; this.stopRequestedLatch = new CountDownLatch(1); this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); + this.producerSendException = new AtomicReference<>(); } @Override @@ -215,6 +218,8 @@ class WorkerSourceTask extends WorkerTask { continue; } + maybeThrowProducerSendException(); + if (toSend == null) { log.trace("{} Nothing to send to Kafka. Polling source for additional records", this); long start = time.milliseconds(); @@ -240,6 +245,15 @@ class WorkerSourceTask extends WorkerTask { } } + private void maybeThrowProducerSendException() { + if (producerSendException.get() != null) { + throw new ConnectException( + "Unrecoverable exception from producer send callback", + producerSendException.get() + ); + } + } + protected List poll() throws InterruptedException { try { return task.poll(); @@ -288,6 +302,7 @@ class WorkerSourceTask extends WorkerTask { recordBatch(toSend.size()); final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup); for (final SourceRecord preTransformRecord : toSend) { + maybeThrowProducerSendException(); retryWithToleranceOperator.sourceRecord(preTransformRecord); final SourceRecord record = transformationChain.apply(preTransformRecord); @@ -322,22 +337,18 @@ class WorkerSourceTask extends WorkerTask { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { - // Given the default settings for zero data loss, this should basically never happen -- - // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request - // timeouts, callbacks with exceptions should never be invoked in practice. If the - // user overrode these settings, the best we can do is notify them of the failure via - // logging. log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e); log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); + producerSendException.compareAndSet(null, e); } else { + recordSent(producerRecord); + counter.completeRecord(); log.trace("{} Wrote record successfully: topic {} partition {} offset {}", WorkerSourceTask.this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord); } - recordSent(producerRecord); - counter.completeRecord(); } }); lastSendFailed = false; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 24a13c2be26..1f5cc438ef7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -21,9 +21,11 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; @@ -542,6 +544,21 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.verifyAll(); } + @Test(expected = ConnectException.class) + public void testSendRecordsProducerCallbackFail() throws Exception { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectSendRecordProducerCallbackFail(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + } + @Test public void testSendRecordsTaskCommitRecordFail() throws Exception { createWorkerTask(); @@ -711,16 +728,24 @@ public class WorkerSourceTaskTest extends ThreadedTest { return expectSendRecordTaskCommitRecordSucceed(false, isRetry); } + private Capture> expectSendRecordProducerCallbackFail() throws InterruptedException { + return expectSendRecord(false, false, false, false); + } + private Capture> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException { - return expectSendRecord(anyTimes, isRetry, true); + return expectSendRecord(anyTimes, isRetry, true, true); } private Capture> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException { - return expectSendRecord(anyTimes, isRetry, false); + return expectSendRecord(anyTimes, isRetry, true, false); } - @SuppressWarnings("unchecked") - private Capture> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException { + private Capture> expectSendRecord( + boolean anyTimes, + boolean isRetry, + boolean sendSuccess, + boolean commitSuccess + ) throws InterruptedException { expectConvertKeyValue(anyTimes); expectApplyTransformationChain(anyTimes); @@ -737,15 +762,19 @@ public class WorkerSourceTaskTest extends ThreadedTest { // 2. Converted data passed to the producer, which will need callbacks invoked for flush to work IExpectationSetters> expect = EasyMock.expect( - producer.send(EasyMock.capture(sent), - EasyMock.capture(producerCallbacks))); + producer.send(EasyMock.capture(sent), + EasyMock.capture(producerCallbacks))); IAnswer> expectResponse = new IAnswer>() { @Override public Future answer() throws Throwable { synchronized (producerCallbacks) { for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, - 0L, 0L, 0, 0), null); + if (sendSuccess) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, + 0L, 0L, 0, 0), null); + } else { + cb.onCompletion(null, new TopicAuthorizationException("foo")); + } } producerCallbacks.reset(); } @@ -757,8 +786,10 @@ public class WorkerSourceTaskTest extends ThreadedTest { else expect.andAnswer(expectResponse); - // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit - expectTaskCommitRecord(anyTimes, succeed); + if (sendSuccess) { + // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit + expectTaskCommitRecord(anyTimes, commitSuccess); + } return sent; }