diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 4edbc3d701f..c6bcda6d4ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -59,7 +59,9 @@ public class RecordCollectorImpl implements RecordCollector { "No more records will be sent and no more offsets will be recorded for this task. " + "Enable TRACE logging to view failed record key and value."; private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (timestamp %d) to topic %s due to %s"; - private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error."; + private final static String PARAMETER_HINT = "\nYou can increase the producer configs `delivery.timeout.ms` and/or " + + "`retries` to avoid this error. Note that `retries` is set to infinite by default."; + private volatile KafkaException sendException; public RecordCollectorImpl(final String streamTaskId, @@ -125,6 +127,8 @@ public class RecordCollectorImpl implements RecordCollector { ) { String errorLogMessage = LOG_MESSAGE; String errorMessage = EXCEPTION_MESSAGE; + // There is no documented API for detecting retriable errors, so we rely on `RetriableException` + // even though it's an implementation detail (i.e. we do the best we can given what's available) if (exception instanceof RetriableException) { errorLogMessage += PARAMETER_HINT; errorMessage += PARAMETER_HINT;