From a1f97c8dc4edd49c7a8a2ebb72734728029a85aa Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Mon, 10 Jul 2017 11:58:51 -0700 Subject: [PATCH] KAFKA-5157; Options for handling corrupt data during deserialization This is the implementation of KIP-161: https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers Author: Eno Thereska Reviewers: Damian Guy , Matthias J. Sax Closes #3423 from enothereska/KAFKA-5157-deserialization-exceptions --- .../apache/kafka/streams/StreamsConfig.java | 18 +++++ .../DeserializationExceptionHandler.java | 60 +++++++++++++++ .../LogAndContinueExceptionHandler.java | 52 +++++++++++++ .../errors/LogAndFailExceptionHandler.java | 52 +++++++++++++ .../internals/GlobalStateUpdateTask.java | 45 +++++------ .../internals/GlobalStreamThread.java | 3 +- .../processor/internals/ProcessorNode.java | 3 + .../processor/internals/RecordQueue.java | 21 ++++- .../SourceNodeRecordDeserializer.java | 34 +++++++- .../processor/internals/StreamTask.java | 21 ++--- .../internals/GlobalStateTaskTest.java | 77 +++++++++++++++++-- .../internals/PartitionGroupTest.java | 5 +- .../processor/internals/RecordQueueTest.java | 60 +++++++++++++-- .../SourceNodeRecordDeserializerTest.java | 6 +- .../test/ProcessorTopologyTestDriver.java | 3 +- 15 files changed, 403 insertions(+), 57 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index e7a8c447afe..606e314c3ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; @@ -146,6 +148,13 @@ public class StreamsConfig extends AbstractConfig { /** {@code connections.max.idle.ms} */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; + /** + * {@code default.deserialization.exception.handler} + */ + public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the DeserializationExceptionHandler interface."; + + /** {@code default key.serde} */ public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the Serde interface."; @@ -303,6 +312,11 @@ public class StreamsConfig extends AbstractConfig { "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) + .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + LogAndFailExceptionHandler.class.getName(), + Importance.MEDIUM, + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_KEY_SERDE_CLASS_CONFIG, Type.CLASS, Serdes.ByteArraySerde.class.getName(), @@ -790,6 +804,10 @@ public class StreamsConfig extends AbstractConfig { return timestampExtractor; } + public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { + return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } + /** * Override any client properties in the original configs with overrides * diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java new file mode 100644 index 00000000000..c1abb4df617 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.processor.ProcessorContext; + +/** + * Interface that specifies how an exception from source node deserialization + * (e.g., reading from Kafka) should be handled. + */ +public interface DeserializationExceptionHandler extends Configurable { + /** + * Inspect a record and the exception received. + * @param context processor context + * @param record record that failed deserialization + * @param exception the actual exception + */ + DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception); + + /** + * Enumeration that describes the response from the exception handler. + */ + enum DeserializationHandlerResponse { + /* continue with processing */ + CONTINUE(0, "CONTINUE"), + /* fail the processing and stop */ + FAIL(1, "FAIL"); + + /** an english description of the api--this is for debugging and can change */ + public final String name; + + /** the permanent and immutable id of an API--this can't change ever */ + public final int id; + + DeserializationHandlerResponse(final int id, final String name) { + this.id = id; + this.name = name; + } + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java new file mode 100644 index 00000000000..dde4b52b1dc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Deserialization handler that logs a deserialization exception and then + * signals the processing pipeline to continue processing more records. + */ +public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler { + private static final Logger log = LoggerFactory.getLogger(StreamThread.class); + + @Override + public DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception) { + + log.warn("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); + + return DeserializationHandlerResponse.CONTINUE; + } + + @Override + public void configure(final Map configs) { + // ignore + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java new file mode 100644 index 00000000000..23557a35337 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + + +/** + * Deserialization handler that logs a deserialization exception and then + * signals the processing pipeline to stop processing more records and fail. + */ +public class LogAndFailExceptionHandler implements DeserializationExceptionHandler { + private static final Logger log = LoggerFactory.getLogger(StreamThread.class); + + @Override + public DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception) { + + log.warn("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); + + return DeserializationHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) { + // ignore + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 2540af0d1d6..38beb63664a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import java.io.IOException; import java.util.HashMap; @@ -29,31 +30,23 @@ import java.util.Set; */ public class GlobalStateUpdateTask implements GlobalStateMaintainer { - private static class SourceNodeAndDeserializer { - private final SourceNode sourceNode; - private final RecordDeserializer deserializer; - - SourceNodeAndDeserializer(final SourceNode sourceNode, - final RecordDeserializer deserializer) { - this.sourceNode = sourceNode; - this.deserializer = deserializer; - } - } - private final ProcessorTopology topology; private final InternalProcessorContext processorContext; private final Map offsets = new HashMap<>(); - private final Map deserializers = new HashMap<>(); + private final Map deserializers = new HashMap<>(); private final GlobalStateManager stateMgr; + private final DeserializationExceptionHandler deserializationExceptionHandler; public GlobalStateUpdateTask(final ProcessorTopology topology, final InternalProcessorContext processorContext, - final GlobalStateManager stateMgr) { + final GlobalStateManager stateMgr, + final DeserializationExceptionHandler deserializationExceptionHandler) { this.topology = topology; this.stateMgr = stateMgr; this.processorContext = processorContext; + this.deserializationExceptionHandler = deserializationExceptionHandler; } @SuppressWarnings("unchecked") @@ -63,7 +56,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { for (final String storeName : storeNames) { final String sourceTopic = storeNameToTopic.get(storeName); final SourceNode source = topology.source(sourceTopic); - deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source))); + deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler)); } initTopology(); processorContext.initialized(); @@ -74,17 +67,21 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { @SuppressWarnings("unchecked") @Override public void update(final ConsumerRecord record) { - final SourceNodeAndDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic()); - final ConsumerRecord deserialized = sourceNodeAndDeserializer.deserializer.deserialize(record); - final ProcessorRecordContext recordContext = + final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic()); + final ConsumerRecord deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record); + + if (deserialized != null) { + final ProcessorRecordContext recordContext = new ProcessorRecordContext(deserialized.timestamp(), - deserialized.offset(), - deserialized.partition(), - deserialized.topic()); - processorContext.setRecordContext(recordContext); - processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode); - sourceNodeAndDeserializer.sourceNode.process(deserialized.key(), deserialized.value()); - offsets.put(new TopicPartition(record.topic(), record.partition()), deserialized.offset() + 1); + deserialized.offset(), + deserialized.partition(), + deserialized.topic()); + processorContext.setRecordContext(recordContext); + processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); + sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), deserialized.value()); + } + + offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); } public void flushState() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 11b89dff176..2d0dcfb3026 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -277,7 +277,8 @@ public class GlobalStreamThread extends Thread { stateMgr, streamsMetrics, cache), - stateMgr), + stateMgr, + config.defaultDeserializationExceptionHandler()), time, config.getLong(StreamsConfig.POLL_MS_CONFIG), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 47f63116bf5..0cc746e71c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -168,6 +168,7 @@ public class ProcessorNode { final Sensor nodeProcessTimeSensor; final Sensor nodePunctuateTimeSensor; final Sensor sourceNodeForwardSensor; + final Sensor sourceNodeSkippedDueToDeserializationError; final Sensor nodeCreationSensor; final Sensor nodeDestructionSensor; @@ -186,6 +187,7 @@ public class ProcessorNode { this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); } public void removeAllSensors() { @@ -194,6 +196,7 @@ public class ProcessorNode { metrics.removeSensor(sourceNodeForwardSensor); metrics.removeSensor(nodeCreationSensor); metrics.removeSensor(nodeDestructionSensor); + metrics.removeSensor(sourceNodeSkippedDueToDeserializationError); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 0902614ced2..d26511cc92e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,19 +41,25 @@ public class RecordQueue { private final TopicPartition partition; private final ArrayDeque fifoQueue; private final TimestampTracker> timeTracker; - private final RecordDeserializer recordDeserializer; + private final SourceNodeRecordDeserializer recordDeserializer; + private final DeserializationExceptionHandler deserializationExceptionHandler; + private final ProcessorContext processorContext; private long partitionTime = TimestampTracker.NOT_KNOWN; RecordQueue(final TopicPartition partition, final SourceNode source, - final TimestampExtractor timestampExtractor) { + final TimestampExtractor timestampExtractor, + final DeserializationExceptionHandler deserializationExceptionHandler, + final ProcessorContext processorContext) { this.partition = partition; this.source = source; this.timestampExtractor = timestampExtractor; this.fifoQueue = new ArrayDeque<>(); this.timeTracker = new MinTimestampTracker<>(); - this.recordDeserializer = new SourceNodeRecordDeserializer(source); + this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler); + this.deserializationExceptionHandler = deserializationExceptionHandler; + this.processorContext = processorContext; } @@ -81,7 +89,12 @@ public class RecordQueue { */ public int addRawRecords(Iterable> rawRecords) { for (ConsumerRecord rawRecord : rawRecords) { - ConsumerRecord record = recordDeserializer.deserialize(rawRecord); + + ConsumerRecord record = recordDeserializer.tryDeserialize(processorContext, rawRecord); + if (record == null) { + continue; + } + long timestamp = timestampExtractor.extract(record, timeTracker.get()); log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java index f66c0d9801a..e26d11086fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java @@ -18,15 +18,21 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; import static java.lang.String.format; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; class SourceNodeRecordDeserializer implements RecordDeserializer { private final SourceNode sourceNode; + private final DeserializationExceptionHandler deserializationExceptionHandler; - SourceNodeRecordDeserializer(final SourceNode sourceNode) { + SourceNodeRecordDeserializer(final SourceNode sourceNode, + final DeserializationExceptionHandler deserializationExceptionHandler) { this.sourceNode = sourceNode; + this.deserializationExceptionHandler = deserializationExceptionHandler; } @Override @@ -54,4 +60,30 @@ class SourceNodeRecordDeserializer implements RecordDeserializer { rawRecord.serializedValueSize(), key, value); } + + public ConsumerRecord tryDeserialize(final ProcessorContext processorContext, + ConsumerRecord rawRecord) { + + // catch and process if we have a deserialization handler + try { + return deserialize(rawRecord); + } catch (Exception e) { + final DeserializationExceptionHandler.DeserializationHandlerResponse response = + deserializationExceptionHandler.handle(processorContext, rawRecord, e); + if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { + throw new StreamsException("Deserialization exception handler is set to fail upon" + + " a deserialization error. If you would rather have the streaming pipeline" + + " continue after a deserialization error, please set the " + + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", + e); + } else { + sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record(); + } + } + return null; + } + + public SourceNode sourceNode() { + return sourceNode; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3fd4596da48..6ff1818edf6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; @@ -122,15 +123,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // to corresponding source nodes in the processor topology final Map partitionQueues = new HashMap<>(); - final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); - for (final TopicPartition partition : partitions) { - final SourceNode source = topology.source(partition.topic()); - final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor; - final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor); - partitionQueues.put(partition, queue); - } - - partitionGroup = new PartitionGroup(partitionQueues); // initialize the consumed offset cache consumedOffsets = new HashMap<>(); @@ -140,6 +132,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // initialize the topology with its own context processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); + + final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); + final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); + for (final TopicPartition partition : partitions) { + final SourceNode source = topology.source(partition.topic()); + final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor; + final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext); + partitionQueues.put(partition, queue); + } + + partitionGroup = new PartitionGroup(partitionQueues); this.time = time; log.debug("{} Initializing", logPrefix); initializeStateStores(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 4022ba970ff..7859a061638 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -18,10 +18,15 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.test.GlobalStateManagerStub; import org.apache.kafka.test.MockProcessorNode; @@ -41,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class GlobalStateTaskTest { @@ -53,6 +59,7 @@ public class GlobalStateTaskTest { private TopicPartition t2; private MockSourceNode sourceOne; private MockSourceNode sourceTwo; + private ProcessorTopology topology; @Before public void before() { @@ -70,12 +77,12 @@ public class GlobalStateTaskTest { final Map storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); - final ProcessorTopology topology = new ProcessorTopology(processorNodes, - sourceByTopics, - Collections.emptyMap(), - Collections.emptyList(), - storeToTopic, - Collections.emptyList()); + topology = new ProcessorTopology(processorNodes, + sourceByTopics, + Collections.emptyMap(), + Collections.emptyList(), + storeToTopic, + Collections.emptyList()); context = new NoOpProcessorContext(); t1 = new TopicPartition("t1", 1); @@ -84,7 +91,7 @@ public class GlobalStateTaskTest { offsets.put(t1, 50L); offsets.put(t2, 100L); stateMgr = new GlobalStateManagerStub(storeNames, offsets); - globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr); + globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler()); } @Test @@ -129,6 +136,62 @@ public class GlobalStateTaskTest { assertEquals(0, sourceOne.numReceived); } + private void maybeDeserialize(final GlobalStateUpdateTask globalStateTask, + final byte[] key, + final byte[] recordValue, + boolean failExpected) { + final ConsumerRecord record = new ConsumerRecord<>("t2", 1, 1, + 0L, TimestampType.CREATE_TIME, 0L, 0, 0, + key, recordValue); + globalStateTask.initialize(); + try { + globalStateTask.update(record); + if (failExpected) { + fail("Should have failed to deserialize."); + } + } catch (StreamsException e) { + if (!failExpected) { + fail("Shouldn't have failed to deserialize."); + } + } + } + + + @Test + public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception { + final byte[] key = new LongSerializer().serialize("t2", 1L); + final byte[] recordValue = new IntegerSerializer().serialize("t2", 10); + maybeDeserialize(globalStateTask, key, recordValue, true); + } + + + @Test + public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception { + final byte[] key = new IntegerSerializer().serialize("t2", 1); + final byte[] recordValue = new LongSerializer().serialize("t2", 10L); + maybeDeserialize(globalStateTask, key, recordValue, true); + } + + @Test + public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception { + final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr, + new LogAndContinueExceptionHandler()); + final byte[] key = new LongSerializer().serialize("t2", 1L); + final byte[] recordValue = new IntegerSerializer().serialize("t2", 10); + + maybeDeserialize(globalStateTask2, key, recordValue, false); + } + + @Test + public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception { + final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr, + new LogAndContinueExceptionHandler()); + final byte[] key = new IntegerSerializer().serialize("t2", 1); + final byte[] recordValue = new LongSerializer().serialize("t2", 10L); + + maybeDeserialize(globalStateTask2, key, recordValue, false); + } + @Test public void shouldCloseStateManagerWithOffsets() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 476b009a1fc..d9f38ebc19f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -40,8 +41,8 @@ public class PartitionGroupTest { private final String[] topics = {"topic"}; private final TopicPartition partition1 = new TopicPartition(topics[0], 1); private final TopicPartition partition2 = new TopicPartition(topics[0], 2); - private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor); - private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor); + private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null); + private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index f80b7ebb542..6c45fd89ede 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -27,12 +27,19 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Arrays; @@ -44,13 +51,30 @@ public class RecordQueueTest { private final Deserializer intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); private final String[] topics = {"topic"}; + + final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), + new RecordCollectorImpl(null, null)); + private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer); private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), - new MockSourceNode<>(topics, intDeserializer, intDeserializer), - timestampExtractor); + mockSourceNodeWithMetrics, + timestampExtractor, new LogAndFailExceptionHandler(), context); + private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(topics[0], 1), + mockSourceNodeWithMetrics, + timestampExtractor, new LogAndContinueExceptionHandler(), context); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); + @Before + public void before() { + mockSourceNodeWithMetrics.init(context); + } + + @After + public void after() { + mockSourceNodeWithMetrics.close(); + } + @Test public void testTimeTracking() { @@ -140,14 +164,38 @@ public class RecordQueueTest { queue.addRawRecords(records); } + @Test + public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception { + final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); + final List> records = Collections.singletonList( + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue)); + final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); + + queueThatSkipsDeserializeErrors.addRawRecords(records); + assertEquals(0, queueThatSkipsDeserializeErrors.size()); + } + + @Test + public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() throws Exception { + final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final List> records = Collections.singletonList( + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value)); + + queueThatSkipsDeserializeErrors.addRawRecords(records); + assertEquals(0, queueThatSkipsDeserializeErrors.size()); + } + + @Test(expected = StreamsException.class) public void shouldThrowOnNegativeTimestamp() { final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), - new MockSourceNode<>(topics, intDeserializer, intDeserializer), - new FailOnInvalidTimestamp()); + new MockSourceNode<>(topics, intDeserializer, intDeserializer), + new FailOnInvalidTimestamp(), + new LogAndContinueExceptionHandler(), + null); queue.addRawRecords(records); } @@ -158,7 +206,9 @@ public class RecordQueueTest { final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), new MockSourceNode<>(topics, intDeserializer, intDeserializer), - new LogAndSkipOnInvalidTimestamp()); + new LogAndSkipOnInvalidTimestamp(), + new LogAndContinueExceptionHandler(), + null); queue.addRawRecords(records); assertEquals(0, queue.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java index a9f41e7627d..9ba8308ef81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java @@ -43,21 +43,21 @@ public class SourceNodeRecordDeserializerTest { @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() throws Exception { final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer( - new TheSourceNode(true, false)); + new TheSourceNode(true, false), null); recordDeserializer.deserialize(rawRecord); } @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() throws Exception { final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer( - new TheSourceNode(false, true)); + new TheSourceNode(false, true), null); recordDeserializer.deserialize(rawRecord); } @Test public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() throws Exception { final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer( - new TheSourceNode(false, false, "key", "value")); + new TheSourceNode(false, false, "key", "value"), null); final ConsumerRecord record = recordDeserializer.deserialize(rawRecord); assertEquals(rawRecord.topic(), record.topic()); assertEquals(rawRecord.partition(), record.partition()); diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index a9f020b208c..c59113eadf7 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -205,7 +206,7 @@ public class ProcessorTopologyTestDriver { final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), - stateManager + stateManager, new LogAndContinueExceptionHandler() ); globalStateTask.initialize(); }