|
|
@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor; |
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord; |
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord; |
|
|
|
import org.apache.kafka.common.annotation.InterfaceStability; |
|
|
|
import org.apache.kafka.common.annotation.InterfaceStability; |
|
|
|
import org.apache.kafka.streams.errors.StreamsException; |
|
|
|
import org.apache.kafka.streams.errors.StreamsException; |
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Retrieves embedded metadata timestamps from Kafka messages. |
|
|
|
* Retrieves embedded metadata timestamps from Kafka messages. |
|
|
@ -45,6 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException; |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@InterfaceStability.Evolving |
|
|
|
@InterfaceStability.Evolving |
|
|
|
public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { |
|
|
|
public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { |
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(FailOnInvalidTimestamp.class); |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Raises an exception on every call. |
|
|
|
* Raises an exception on every call. |
|
|
@ -60,10 +63,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { |
|
|
|
final long recordTimestamp, |
|
|
|
final long recordTimestamp, |
|
|
|
final long previousTimestamp) |
|
|
|
final long previousTimestamp) |
|
|
|
throws StreamsException { |
|
|
|
throws StreamsException { |
|
|
|
throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + |
|
|
|
|
|
|
|
"Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + |
|
|
|
final String message = "Input record " + record + " has invalid (negative) timestamp. " + |
|
|
|
"or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + |
|
|
|
"Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding " + |
|
|
|
"Use a different TimestampExtractor to process this data."); |
|
|
|
"a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + |
|
|
|
|
|
|
|
"Use a different TimestampExtractor to process this data."; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log.error(message); |
|
|
|
|
|
|
|
throw new StreamsException(message); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|