diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 0dbad5b81f2..31596cc0de1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -44,6 +44,7 @@ public class RecordCollectorImpl implements RecordCollector { private final Producer producer; private final Map offsets; private final String logPrefix; + private volatile Exception sendException; public RecordCollectorImpl(Producer producer, String streamTaskId) { @@ -60,6 +61,7 @@ public class RecordCollectorImpl implements RecordCollector { @Override public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, StreamPartitioner partitioner) { + checkForException(); byte[] keyBytes = keySerializer.serialize(record.topic(), record.key()); byte[] valBytes = valueSerializer.serialize(record.topic(), record.value()); Integer partition = record.partition(); @@ -79,10 +81,14 @@ public class RecordCollectorImpl implements RecordCollector { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { + if (sendException != null) { + return; + } TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition()); offsets.put(tp, metadata.offset()); } else { - log.error("{} Error sending record to topic {}", logPrefix, topic, exception); + sendException = exception; + log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception); } } }); @@ -98,10 +104,17 @@ public class RecordCollectorImpl implements RecordCollector { } } + private void checkForException() { + if (sendException != null) { + throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException); + } + } + @Override public void flush() { log.debug("{} Flushing producer", logPrefix); this.producer.flush(); + checkForException(); } /** @@ -110,6 +123,7 @@ public class RecordCollectorImpl implements RecordCollector { @Override public void close() { producer.close(); + checkForException(); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 66397fbbc25..03256ebc84e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -161,4 +161,53 @@ public class RecordCollectorTest { collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner); } + + @SuppressWarnings("unchecked") + @Test(expected = StreamsException.class) + public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }, + "test"); + collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner); + collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner); + } + + @SuppressWarnings("unchecked") + @Test(expected = StreamsException.class) + public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }, + "test"); + collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner); + collector.flush(); + } + + @SuppressWarnings("unchecked") + @Test(expected = StreamsException.class) + public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }, + "test"); + collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner); + collector.close(); + } + }