Browse Source

KAFKA-8586: Fail source tasks when producers fail to send records (#6993)

Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.

Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
pull/7253/head
Chris Egerton 5 years ago committed by Randall Hauch
parent
commit
237e83dea0
  1. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  2. 25
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
  3. 51
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -545,8 +545,8 @@ public class Worker { @@ -545,8 +545,8 @@ public class Worker {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
// worker, but this may compromise the delivery guarantees of Kafka Connect.
// These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");

25
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java

@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException; @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
@ -78,6 +79,7 @@ class WorkerSourceTask extends WorkerTask { @@ -78,6 +79,7 @@ class WorkerSourceTask extends WorkerTask {
private final OffsetStorageWriter offsetWriter;
private final Time time;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
private final AtomicReference<Exception> producerSendException;
private List<SourceRecord> toSend;
private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
@ -133,6 +135,7 @@ class WorkerSourceTask extends WorkerTask { @@ -133,6 +135,7 @@ class WorkerSourceTask extends WorkerTask {
this.flushing = false;
this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
this.producerSendException = new AtomicReference<>();
}
@Override
@ -215,6 +218,8 @@ class WorkerSourceTask extends WorkerTask { @@ -215,6 +218,8 @@ class WorkerSourceTask extends WorkerTask {
continue;
}
maybeThrowProducerSendException();
if (toSend == null) {
log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
long start = time.milliseconds();
@ -240,6 +245,15 @@ class WorkerSourceTask extends WorkerTask { @@ -240,6 +245,15 @@ class WorkerSourceTask extends WorkerTask {
}
}
private void maybeThrowProducerSendException() {
if (producerSendException.get() != null) {
throw new ConnectException(
"Unrecoverable exception from producer send callback",
producerSendException.get()
);
}
}
protected List<SourceRecord> poll() throws InterruptedException {
try {
return task.poll();
@ -288,6 +302,7 @@ class WorkerSourceTask extends WorkerTask { @@ -288,6 +302,7 @@ class WorkerSourceTask extends WorkerTask {
recordBatch(toSend.size());
final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
for (final SourceRecord preTransformRecord : toSend) {
maybeThrowProducerSendException();
retryWithToleranceOperator.sourceRecord(preTransformRecord);
final SourceRecord record = transformationChain.apply(preTransformRecord);
@ -322,22 +337,18 @@ class WorkerSourceTask extends WorkerTask { @@ -322,22 +337,18 @@ class WorkerSourceTask extends WorkerTask {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
// Given the default settings for zero data loss, this should basically never happen --
// between "infinite" retries, indefinite blocking on full buffers, and "infinite" request
// timeouts, callbacks with exceptions should never be invoked in practice. If the
// user overrode these settings, the best we can do is notify them of the failure via
// logging.
log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
} else {
recordSent(producerRecord);
counter.completeRecord();
log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
WorkerSourceTask.this,
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(preTransformRecord);
}
recordSent(producerRecord);
counter.completeRecord();
}
});
lastSendFailed = false;

51
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java

@ -21,9 +21,11 @@ import org.apache.kafka.clients.producer.ProducerRecord; @@ -21,9 +21,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@ -542,6 +544,21 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -542,6 +544,21 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
@Test(expected = ConnectException.class)
public void testSendRecordsProducerCallbackFail() throws Exception {
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectSendRecordProducerCallbackFail();
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
}
@Test
public void testSendRecordsTaskCommitRecordFail() throws Exception {
createWorkerTask();
@ -711,16 +728,24 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -711,16 +728,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return expectSendRecordTaskCommitRecordSucceed(false, isRetry);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException {
return expectSendRecord(false, false, false, false);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, true);
return expectSendRecord(anyTimes, isRetry, true, true);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, false);
return expectSendRecord(anyTimes, isRetry, true, false);
}
@SuppressWarnings("unchecked")
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
boolean anyTimes,
boolean isRetry,
boolean sendSuccess,
boolean commitSuccess
) throws InterruptedException {
expectConvertKeyValue(anyTimes);
expectApplyTransformationChain(anyTimes);
@ -737,15 +762,19 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -737,15 +762,19 @@ public class WorkerSourceTaskTest extends ThreadedTest {
// 2. Converted data passed to the producer, which will need callbacks invoked for flush to work
IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
producer.send(EasyMock.capture(sent),
EasyMock.capture(producerCallbacks)));
producer.send(EasyMock.capture(sent),
EasyMock.capture(producerCallbacks)));
IAnswer<Future<RecordMetadata>> expectResponse = new IAnswer<Future<RecordMetadata>>() {
@Override
public Future<RecordMetadata> answer() throws Throwable {
synchronized (producerCallbacks) {
for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
0L, 0L, 0, 0), null);
if (sendSuccess) {
cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
0L, 0L, 0, 0), null);
} else {
cb.onCompletion(null, new TopicAuthorizationException("foo"));
}
}
producerCallbacks.reset();
}
@ -757,8 +786,10 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -757,8 +786,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
else
expect.andAnswer(expectResponse);
// 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
expectTaskCommitRecord(anyTimes, succeed);
if (sendSuccess) {
// 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
expectTaskCommitRecord(anyTimes, commitSuccess);
}
return sent;
}

Loading…
Cancel
Save