@ -56,13 +56,18 @@ public class RecordCollectorImpl implements RecordCollector {
private Producer < byte [ ] , byte [ ] > producer ;
private Producer < byte [ ] , byte [ ] > producer ;
private final Map < TopicPartition , Long > offsets ;
private final Map < TopicPartition , Long > offsets ;
private final ProductionExceptionHandler productionExceptionHandler ;
private final ProductionExceptionHandler productionExceptionHandler ;
private final static String LOG_MESSAGE = "Error sending record to topic {} due to {}; " +
private final static String LOG_MESSAGE = "Error sending record to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task. " +
"No more records will be sent and no more offsets will be recorded for this task. " +
"Enable TRACE logging to view failed record key and value." ;
"Enable TRACE logging to view failed record key and value." ;
private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (timestamp %d) to topic %s due to %s" ;
private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (timestamp %d) to topic %s due to %s" ;
private final static String PARAMETER_HINT = "\nYou can increase the producer configs `delivery.timeout.ms` and/or " +
private final static String PARAMETER_HINT = "\nYou can increase the producer configs `delivery.timeout.ms` and/or " +
"`retries` to avoid this error. Note that `retries` is set to infinite by default." ;
"`retries` to avoid this error. Note that `retries` is set to infinite by default." ;
private final static String TIMEOUT_HINT_TEMPLATE = "%nTimeout exception caught when sending record to topic %s. " +
"This might happen if the producer cannot send data to the Kafka cluster and thus, " +
"its internal buffer fills up. " +
"This can also happen if the broker is slow to respond, if the network connection to " +
"the broker was interrupted, or if similar circumstances arise. " +
"You can increase producer parameter `max.block.ms` to increase this timeout." ;
private volatile KafkaException sendException ;
private volatile KafkaException sendException ;
@ -129,9 +134,14 @@ public class RecordCollectorImpl implements RecordCollector {
) {
) {
String errorLogMessage = LOG_MESSAGE ;
String errorLogMessage = LOG_MESSAGE ;
String errorMessage = EXCEPTION_MESSAGE ;
String errorMessage = EXCEPTION_MESSAGE ;
// There is no documented API for detecting retriable errors, so we rely on `RetriableException`
// even though it's an implementation detail (i.e. we do the best we can given what's available)
if ( exception instanceof TimeoutException ) {
if ( exception instanceof RetriableException ) {
final String topicTimeoutHint = String . format ( TIMEOUT_HINT_TEMPLATE , topic ) ;
errorLogMessage + = topicTimeoutHint ;
errorMessage + = topicTimeoutHint ;
} else if ( exception instanceof RetriableException ) {
// There is no documented API for detecting retriable errors, so we rely on `RetriableException`
// even though it's an implementation detail (i.e. we do the best we can given what's available)
errorLogMessage + = PARAMETER_HINT ;
errorLogMessage + = PARAMETER_HINT ;
errorMessage + = PARAMETER_HINT ;
errorMessage + = PARAMETER_HINT ;
}
}
@ -220,21 +230,6 @@ public class RecordCollectorImpl implements RecordCollector {
}
}
}
}
} ) ;
} ) ;
} catch ( final TimeoutException e ) {
log . error (
"Timeout exception caught when sending record to topic {}. " +
"This might happen if the producer cannot send data to the Kafka cluster and thus, " +
"its internal buffer fills up. " +
"This can also happen if the broker is slow to respond, if the network connection to " +
"the broker was interrupted, or if similar circumstances arise. " +
"You can increase producer parameter `max.block.ms` to increase this timeout." ,
topic ,
e
) ;
throw new StreamsException (
String . format ( "%sFailed to send record to topic %s due to timeout." , logPrefix , topic ) ,
e
) ;
} catch ( final RuntimeException uncaughtException ) {
} catch ( final RuntimeException uncaughtException ) {
if ( uncaughtException instanceof KafkaException & &
if ( uncaughtException instanceof KafkaException & &
uncaughtException . getCause ( ) instanceof ProducerFencedException ) {
uncaughtException . getCause ( ) instanceof ProducerFencedException ) {