From 76b070284142635d2be78e8f86e779dfdbb0a2e0 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 30 Oct 2016 11:37:38 -0700 Subject: [PATCH] HOTFIX: improve error message on invalid input record timestamp Author: Matthias J. Sax Reviewers: Guozhang Wang, Ismael Juma, Michael G. Noll, Eno Thereska Closes #2076 from mjsax/hotfixTSExtractor --- .../streams/processor/internals/SinkNode.java | 12 +- .../processor/internals/SinkNodeTest.java | 145 ++++++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 4e56f611e80..8ac373ba291 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.ChangedSerializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -69,7 +70,16 @@ public class SinkNode extends ProcessorNode { @Override public void process(final K key, final V value) { RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); - collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner); + + final long timestamp = context.timestamp(); + if (timestamp < 0) { + throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " + + "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + + "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."); + } + + collector.send(new ProducerRecord(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java new file mode 100644 index 00000000000..3b415174a93 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +public class SinkNodeTest { + + @Test(expected = StreamsException.class) + @SuppressWarnings("unchecked") + public void invalidInputRecordTimestampTest() { + final Serializer anySerializer = Serdes.Bytes().serializer(); + + final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null); + sink.init(new MockProcessorContext()); + + sink.process(null, null); + } + + private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier { + private final long invalidTimestamp = -1; + + @Override + public String applicationId() { + return null; + } + + @Override + public TaskId taskId() { + return null; + } + + @Override + public Serde keySerde() { + return null; + } + + @Override + public Serde valueSerde() { + return null; + } + + @Override + public File stateDir() { + return null; + } + + @Override + public StreamsMetrics metrics() { + return null; + } + + @Override + public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { + } + + @Override + public StateStore getStateStore(String name) { + return null; + } + + @Override + public void schedule(long interval) { + } + + @Override + public void forward(K key, V value) { + } + + @Override + public void forward(K key, V value, int childIndex) { + } + + @Override + public void forward(K key, V value, String childName) { + } + + @Override + public void commit() { + } + + @Override + public String topic() { + return null; + } + + @Override + public int partition() { + return 0; + } + + @Override + public long offset() { + return 0; + } + + @Override + public long timestamp() { + return invalidTimestamp; + } + + @Override + public Map appConfigs() { + return null; + } + + @Override + public Map appConfigsWithPrefix(String prefix) { + return null; + } + + @Override + public RecordCollector recordCollector() { + return null; + } + } + +} \ No newline at end of file