From 5792f2fb3db69333bfd22b57b00b42336dc16aa9 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 4 Oct 2017 15:10:59 -0700 Subject: [PATCH] KAFKA-5980: FailOnInvalidTimestamp does not log error Author: Matthias J. Sax Reviewers: Damian Guy , Ted Yu , Denis Bolshakov Closes #3966 from mjsax/kafka-5980-FailOnInvalidTimestamp-does-not-log-error --- .../streams/processor/FailOnInvalidTimestamp.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index e8fc78c7b5a..87cb0dec0e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Retrieves embedded metadata timestamps from Kafka messages. @@ -45,6 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException; */ @InterfaceStability.Evolving public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + private static final Logger log = LoggerFactory.getLogger(FailOnInvalidTimestamp.class); /** * Raises an exception on every call. @@ -60,10 +63,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { final long recordTimestamp, final long previousTimestamp) throws StreamsException { - throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + - "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + - "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + - "Use a different TimestampExtractor to process this data."); + + final String message = "Input record " + record + " has invalid (negative) timestamp. " + + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding " + + "a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."; + + log.error(message); + throw new StreamsException(message); } }