diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 4eec2d55802..9566051d6ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -24,10 +24,10 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.slf4j.Logger; @@ -37,15 +37,15 @@ import java.util.List; import java.util.Map; public class RecordCollectorImpl implements RecordCollector { - private static final int MAX_SEND_ATTEMPTS = 3; - private static final long SEND_RETRY_BACKOFF = 100L; - - private final Logger log; private final Producer producer; private final Map offsets; private final String logPrefix; + private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " + + "No more records will be sent and no more offsets will be recorded for this task."; + private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s"; + private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error."; private volatile KafkaException sendException; public RecordCollectorImpl(final Producer producer, final String streamTaskId, final LogContext logContext) { @@ -93,43 +93,70 @@ public class RecordCollectorImpl implements RecordCollector { final ProducerRecord serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes); - // counting from 1 to make check further down more natural - // -> `if (attempt == MAX_SEND_ATTEMPTS)` - for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; ++attempt) { - try { - producer.send(serializedRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - if (sendException != null) { - return; - } - final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition()); - offsets.put(tp, metadata.offset()); - } else { - if (sendException == null) { - log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " + - "No more records will be sent and no more offsets will be recorded for this task.", - key, value, timestamp, topic, exception); - if (exception instanceof ProducerFencedException) { - sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s", - logPrefix, key, value, timestamp, topic, exception.getMessage())); - } else { - sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.", - logPrefix, key, value, timestamp, topic, exception), exception); + try { + producer.send(serializedRecord, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, + final Exception exception) { + if (exception == null) { + if (sendException != null) { + return; + } + final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition()); + offsets.put(tp, metadata.offset()); + } else { + if (sendException == null) { + if (exception instanceof ProducerFencedException) { + log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception); + sendException = new ProducerFencedException( + String.format(EXCEPTION_MESSAGE, + logPrefix, + "producer got fenced", + key, + value, + timestamp, + topic, + exception.getMessage())); + } else { + String errorLogMessage = LOG_MESSAGE; + String errorMessage = EXCEPTION_MESSAGE; + if (exception instanceof RetriableException) { + errorLogMessage += PARAMETER_HINT; + errorMessage += PARAMETER_HINT; } + log.error(errorLogMessage, key, value, timestamp, topic, exception); + sendException = new StreamsException( + String.format(errorMessage, + logPrefix, + "an error caught", + key, + value, + timestamp, + topic, + exception.getMessage()), + exception); } } } - }); - return; - } catch (final TimeoutException e) { - if (attempt == MAX_SEND_ATTEMPTS) { - throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt)); } - log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt); - Utils.sleep(SEND_RETRY_BACKOFF); - } + }); + } catch (final TimeoutException e) { + log.error("Timeout exception caught when sending record to topic {}. " + + "This might happen if the producer cannot send data to the Kafka cluster and thus, " + + "its internal buffer fills up. " + + "You can increase producer parameter `max.block.ms` to increase this timeout.", topic); + throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic)); + } catch (final Exception fatalException) { + throw new StreamsException( + String.format(EXCEPTION_MESSAGE, + logPrefix, + "an error caught", + key, + value, + timestamp, + topic, + fatalException.getMessage()), + fatalException); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 7b2a41e323e..16400d50a2f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -22,10 +22,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogContext; @@ -38,9 +38,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class RecordCollectorTest { @@ -124,47 +124,24 @@ public class RecordCollectorTest { assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); } - @SuppressWarnings("unchecked") - @Test - public void shouldRetryWhenTimeoutExceptionOccursOnSend() { - final AtomicInteger attempt = new AtomicInteger(0); - final RecordCollectorImpl collector = new RecordCollectorImpl( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { - @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - if (attempt.getAndIncrement() == 0) { - throw new TimeoutException(); - } - return super.send(record, callback); - } - }, - "test", - logContext); - - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); - final Long offset = collector.offsets().get(new TopicPartition("topic1", 0)); - assertEquals(Long.valueOf(0L), offset); - } - @SuppressWarnings("unchecked") @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionAfterMaxAttempts() { + public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - throw new TimeoutException(); + throw new KafkaException(); } }, "test", logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); - } @SuppressWarnings("unchecked") - @Test(expected = StreamsException.class) + @Test public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @@ -177,11 +154,15 @@ public class RecordCollectorTest { "test", logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + + try { + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + fail("Should have thrown StreamsException"); + } catch (final StreamsException expected) { /* ok */ } } @SuppressWarnings("unchecked") - @Test(expected = StreamsException.class) + @Test public void shouldThrowStreamsExceptionOnFlushIfASendFailed() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @@ -194,11 +175,15 @@ public class RecordCollectorTest { "test", logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.flush(); + + try { + collector.flush(); + fail("Should have thrown StreamsException"); + } catch (final StreamsException expected) { /* ok */ } } @SuppressWarnings("unchecked") - @Test(expected = StreamsException.class) + @Test public void shouldThrowStreamsExceptionOnCloseIfASendFailed() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @@ -211,7 +196,11 @@ public class RecordCollectorTest { "test", logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); - collector.close(); + + try { + collector.close(); + fail("Should have thrown StreamsException"); + } catch (final StreamsException expected) { /* ok */ } } @SuppressWarnings("unchecked")