diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index e8fc78c7b5a..87cb0dec0e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Retrieves embedded metadata timestamps from Kafka messages. @@ -45,6 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException; */ @InterfaceStability.Evolving public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + private static final Logger log = LoggerFactory.getLogger(FailOnInvalidTimestamp.class); /** * Raises an exception on every call. @@ -60,10 +63,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { final long recordTimestamp, final long previousTimestamp) throws StreamsException { - throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + - "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + - "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + - "Use a different TimestampExtractor to process this data."); + + final String message = "Input record " + record + " has invalid (negative) timestamp. " + + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding " + + "a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."; + + log.error(message); + throw new StreamsException(message); } }