Browse Source

KAFKA-5006: Improve thrown exception error logs

1. Only log an ERROR on the first encountered exception from the callback.

2. Wrap the exception message with the first thrown message information, and throw the exception whenever `checkException` is called.

Therefore, for the `store.put` call, it will throw a `KafkaException` with the error message a bit more intuitive.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Xavier Léauté <xavier@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3534 from guozhangwang/K5006-exception-record-collector
pull/3534/merge
Guozhang Wang 7 years ago
parent
commit
630e9c5679
  1. 27
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

27
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.Callback; @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
@ -44,8 +45,8 @@ public class RecordCollectorImpl implements RecordCollector { @@ -44,8 +45,8 @@ public class RecordCollectorImpl implements RecordCollector {
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private volatile Exception sendException;
private volatile KafkaException sendException;
public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId) {
this.producer = producer;
@ -106,11 +107,15 @@ public class RecordCollectorImpl implements RecordCollector { @@ -106,11 +107,15 @@ public class RecordCollectorImpl implements RecordCollector {
offsets.put(tp, metadata.offset());
} else {
if (sendException == null) {
sendException = exception;
if (sendException instanceof ProducerFencedException) {
log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and it will be closed as it is a zombie.", logPrefix, topic, exception);
log.error("{} Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.",
logPrefix, key, value, timestamp, topic, exception);
if (exception instanceof ProducerFencedException) {
sendException = new ProducerFencedException(String.format("%s Abort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
logPrefix, key, value, timestamp, topic, exception.getMessage()));
} else {
log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
sendException = new StreamsException(String.format("%s Abort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
logPrefix, key, value, timestamp, topic, exception), exception);
}
}
}
@ -119,21 +124,17 @@ public class RecordCollectorImpl implements RecordCollector { @@ -119,21 +124,17 @@ public class RecordCollectorImpl implements RecordCollector {
return;
} catch (final TimeoutException e) {
if (attempt == MAX_SEND_ATTEMPTS) {
throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
throw new StreamsException(String.format("%s Failed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
}
log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
log.warn("{} Timeout exception caught when sending record to topic {}; retrying with {} attempt", logPrefix, topic, attempt);
Utils.sleep(SEND_RETRY_BACKOFF);
}
}
}
private void checkForException() {
private void checkForException() {
if (sendException != null) {
if (sendException instanceof ProducerFencedException) {
throw (ProducerFencedException) sendException;
}
throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
throw sendException;
}
}

Loading…
Cancel
Save