From 298a9bc397878ddc755b91c44aef0c928be6eaf9 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 2 Jul 2019 21:14:06 -0700 Subject: [PATCH] MINOR: Adjust Streams parameter hint on TimeoutException (#6280) KIP-91 was included in Kafka 2.1.0, so we should mention `delivery.timeout.ms` in the hint as it's the config that users would want to change in most cases. Reviewers: Matthias J. Sax , John Roesler , Bill Bejeck , Guozhang Wang --- .../streams/processor/internals/RecordCollectorImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 4edbc3d701f..c6bcda6d4ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -59,7 +59,9 @@ public class RecordCollectorImpl implements RecordCollector { "No more records will be sent and no more offsets will be recorded for this task. " + "Enable TRACE logging to view failed record key and value."; private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (timestamp %d) to topic %s due to %s"; - private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error."; + private final static String PARAMETER_HINT = "\nYou can increase the producer configs `delivery.timeout.ms` and/or " + + "`retries` to avoid this error. Note that `retries` is set to infinite by default."; + private volatile KafkaException sendException; public RecordCollectorImpl(final String streamTaskId, @@ -125,6 +127,8 @@ public class RecordCollectorImpl implements RecordCollector { ) { String errorLogMessage = LOG_MESSAGE; String errorMessage = EXCEPTION_MESSAGE; + // There is no documented API for detecting retriable errors, so we rely on `RetriableException` + // even though it's an implementation detail (i.e. we do the best we can given what's available) if (exception instanceof RetriableException) { errorLogMessage += PARAMETER_HINT; errorMessage += PARAMETER_HINT;