Browse Source

HOTFIX: improve error message on invalid input record timestamp

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang, Ismael Juma, Michael G. Noll, Eno Thereska

Closes #2076 from mjsax/hotfixTSExtractor
pull/2076/merge
Matthias J. Sax 8 years ago committed by Guozhang Wang
parent
commit
76b0702841
  1. 12
      streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
  2. 145
      streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java

12
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java

@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
@ -69,7 +70,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { @@ -69,7 +70,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
@Override
public void process(final K key, final V value) {
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner);
final long timestamp = context.timestamp();
if (timestamp < 0) {
throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " +
"possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
"or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
"Use a different TimestampExtractor to process this data.");
}
collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
}
@Override

145
streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java

@ -0,0 +1,145 @@ @@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Test;
import java.io.File;
import java.util.Map;
public class SinkNodeTest {
@Test(expected = StreamsException.class)
@SuppressWarnings("unchecked")
public void invalidInputRecordTimestampTest() {
final Serializer anySerializer = Serdes.Bytes().serializer();
final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
sink.init(new MockProcessorContext());
sink.process(null, null);
}
private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
private final long invalidTimestamp = -1;
@Override
public String applicationId() {
return null;
}
@Override
public TaskId taskId() {
return null;
}
@Override
public Serde<?> keySerde() {
return null;
}
@Override
public Serde<?> valueSerde() {
return null;
}
@Override
public File stateDir() {
return null;
}
@Override
public StreamsMetrics metrics() {
return null;
}
@Override
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
}
@Override
public StateStore getStateStore(String name) {
return null;
}
@Override
public void schedule(long interval) {
}
@Override
public <K, V> void forward(K key, V value) {
}
@Override
public <K, V> void forward(K key, V value, int childIndex) {
}
@Override
public <K, V> void forward(K key, V value, String childName) {
}
@Override
public void commit() {
}
@Override
public String topic() {
return null;
}
@Override
public int partition() {
return 0;
}
@Override
public long offset() {
return 0;
}
@Override
public long timestamp() {
return invalidTimestamp;
}
@Override
public Map<String, Object> appConfigs() {
return null;
}
@Override
public Map<String, Object> appConfigsWithPrefix(String prefix) {
return null;
}
@Override
public RecordCollector recordCollector() {
return null;
}
}
}
Loading…
Cancel
Save