Browse Source

MINOR: Adjust Streams parameter hint on TimeoutException (#6280)

KIP-91 was included in Kafka 2.1.0, so we should mention
`delivery.timeout.ms` in the hint as it's the config that
users would want to change in most cases.

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
pull/7036/head
Ismael Juma 5 years ago committed by GitHub
parent
commit
298a9bc397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -59,7 +59,9 @@ public class RecordCollectorImpl implements RecordCollector { @@ -59,7 +59,9 @@ public class RecordCollectorImpl implements RecordCollector {
"No more records will be sent and no more offsets will be recorded for this task. " +
"Enable TRACE logging to view failed record key and value.";
private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (timestamp %d) to topic %s due to %s";
private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
private final static String PARAMETER_HINT = "\nYou can increase the producer configs `delivery.timeout.ms` and/or " +
"`retries` to avoid this error. Note that `retries` is set to infinite by default.";
private volatile KafkaException sendException;
public RecordCollectorImpl(final String streamTaskId,
@ -125,6 +127,8 @@ public class RecordCollectorImpl implements RecordCollector { @@ -125,6 +127,8 @@ public class RecordCollectorImpl implements RecordCollector {
) {
String errorLogMessage = LOG_MESSAGE;
String errorMessage = EXCEPTION_MESSAGE;
// There is no documented API for detecting retriable errors, so we rely on `RetriableException`
// even though it's an implementation detail (i.e. we do the best we can given what's available)
if (exception instanceof RetriableException) {
errorLogMessage += PARAMETER_HINT;
errorMessage += PARAMETER_HINT;

Loading…
Cancel
Save