diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index dd3a94acbe4..a5d749a4ff0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -855,7 +855,6 @@ public class KafkaProducer implements Producer { * when send is invoked after producer has been closed. * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers - * @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed max.block.ms. * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. */ @Override @@ -996,6 +995,7 @@ public class KafkaProducer implements Producer { * @param nowMs The current time in ms * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The cluster containing topic metadata and the amount of time we waited in ms + * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms} * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close */ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 61afb3a61d4..b444433fbc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -56,13 +56,18 @@ public class RecordCollectorImpl implements RecordCollector { private Producer producer; private final Map offsets; private final ProductionExceptionHandler productionExceptionHandler; - private final static String LOG_MESSAGE = "Error sending record to topic {} due to {}; " + "No more records will be sent and no more offsets will be recorded for this task. " + "Enable TRACE logging to view failed record key and value."; private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (timestamp %d) to topic %s due to %s"; private final static String PARAMETER_HINT = "\nYou can increase the producer configs `delivery.timeout.ms` and/or " + "`retries` to avoid this error. Note that `retries` is set to infinite by default."; + private final static String TIMEOUT_HINT_TEMPLATE = "%nTimeout exception caught when sending record to topic %s. " + + "This might happen if the producer cannot send data to the Kafka cluster and thus, " + + "its internal buffer fills up. " + + "This can also happen if the broker is slow to respond, if the network connection to " + + "the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout."; private volatile KafkaException sendException; @@ -129,9 +134,14 @@ public class RecordCollectorImpl implements RecordCollector { ) { String errorLogMessage = LOG_MESSAGE; String errorMessage = EXCEPTION_MESSAGE; - // There is no documented API for detecting retriable errors, so we rely on `RetriableException` - // even though it's an implementation detail (i.e. we do the best we can given what's available) - if (exception instanceof RetriableException) { + + if (exception instanceof TimeoutException) { + final String topicTimeoutHint = String.format(TIMEOUT_HINT_TEMPLATE, topic); + errorLogMessage += topicTimeoutHint; + errorMessage += topicTimeoutHint; + } else if (exception instanceof RetriableException) { + // There is no documented API for detecting retriable errors, so we rely on `RetriableException` + // even though it's an implementation detail (i.e. we do the best we can given what's available) errorLogMessage += PARAMETER_HINT; errorMessage += PARAMETER_HINT; } @@ -220,21 +230,6 @@ public class RecordCollectorImpl implements RecordCollector { } } }); - } catch (final TimeoutException e) { - log.error( - "Timeout exception caught when sending record to topic {}. " + - "This might happen if the producer cannot send data to the Kafka cluster and thus, " + - "its internal buffer fills up. " + - "This can also happen if the broker is slow to respond, if the network connection to " + - "the broker was interrupted, or if similar circumstances arise. " + - "You can increase producer parameter `max.block.ms` to increase this timeout.", - topic, - e - ); - throw new StreamsException( - String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic), - e - ); } catch (final RuntimeException uncaughtException) { if (uncaughtException instanceof KafkaException && uncaughtException.getCause() instanceof ProducerFencedException) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 27d259fddce..e24ed004676 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -79,6 +80,11 @@ public class RecordCollectorTest { return Integer.parseInt(key) % numPartitions; }; + private final String topic1TimeoutHint = "Timeout exception caught when sending record to topic topic1." + + " This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up." + + " This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise." + + " You can increase producer parameter `max.block.ms` to increase this timeout."; + @Test public void testSpecificPartition() { @@ -312,6 +318,28 @@ public class RecordCollectorTest { } catch (final StreamsException expected) { /* ok */ } } + @Test + public void shouldThrowStreamsExceptionWithTimeoutHintOnProducerTimeoutWithDefaultExceptionHandler() { + final RecordCollector collector = new RecordCollectorImpl( + "test", + logContext, + new DefaultProductionExceptionHandler(), + new Metrics().sensor("skipped-records")); + collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new TimeoutException()); + return null; + } + }); + + collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + + final StreamsException expected = assertThrows(StreamsException.class, () -> collector.flush()); + assertTrue(expected.getCause() instanceof TimeoutException); + assertTrue(expected.getMessage().endsWith(topic1TimeoutHint)); + } + @SuppressWarnings("unchecked") @Test public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() {