Browse Source

KAFKA-4473: RecordCollector should handle retriable exceptions more strictly

The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2249 from dguy/kafka-4473
pull/2283/head
Damian Guy 8 years ago committed by Guozhang Wang
parent
commit
0321bf5aa6
  1. 16
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
  2. 49
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

16
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -44,6 +44,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -44,6 +44,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private volatile Exception sendException;
public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
@ -60,6 +61,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -60,6 +61,7 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamPartitioner<K, V> partitioner) {
checkForException();
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = record.partition();
@ -79,10 +81,14 @@ public class RecordCollectorImpl implements RecordCollector { @@ -79,10 +81,14 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
if (sendException != null) {
return;
}
TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
sendException = exception;
log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
}
}
});
@ -98,10 +104,17 @@ public class RecordCollectorImpl implements RecordCollector { @@ -98,10 +104,17 @@ public class RecordCollectorImpl implements RecordCollector {
}
}
private void checkForException() {
if (sendException != null) {
throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
}
}
@Override
public void flush() {
log.debug("{} Flushing producer", logPrefix);
this.producer.flush();
checkForException();
}
/**
@ -110,6 +123,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -110,6 +123,7 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void close() {
producer.close();
checkForException();
}
/**

49
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

@ -161,4 +161,53 @@ public class RecordCollectorTest { @@ -161,4 +161,53 @@ public class RecordCollectorTest {
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
collector.close();
}
}

Loading…
Cancel
Save