diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index e7a8c447afe..606e314c3ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -146,6 +148,13 @@ public class StreamsConfig extends AbstractConfig {
/** {@code connections.max.idle.ms} */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+ /**
+ * {@code default.deserialization.exception.handler}
+ */
+ public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
+ private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the DeserializationExceptionHandler interface.";
+
+
/** {@code default key.serde} */
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the Serde interface.";
@@ -303,6 +312,11 @@ public class StreamsConfig extends AbstractConfig {
"",
Importance.MEDIUM,
CommonClientConfigs.CLIENT_ID_DOC)
+ .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+ Type.CLASS,
+ LogAndFailExceptionHandler.class.getName(),
+ Importance.MEDIUM,
+ DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Type.CLASS,
Serdes.ByteArraySerde.class.getName(),
@@ -790,6 +804,10 @@ public class StreamsConfig extends AbstractConfig {
return timestampExtractor;
}
+ public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
+ return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
+ }
+
/**
* Override any client properties in the original configs with overrides
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
new file mode 100644
index 00000000000..c1abb4df617
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+/**
+ * Interface that specifies how an exception from source node deserialization
+ * (e.g., reading from Kafka) should be handled.
+ */
+public interface DeserializationExceptionHandler extends Configurable {
+ /**
+ * Inspect a record and the exception received.
+ * @param context processor context
+ * @param record record that failed deserialization
+ * @param exception the actual exception
+ */
+ DeserializationHandlerResponse handle(final ProcessorContext context,
+ final ConsumerRecord record,
+ final Exception exception);
+
+ /**
+ * Enumeration that describes the response from the exception handler.
+ */
+ enum DeserializationHandlerResponse {
+ /* continue with processing */
+ CONTINUE(0, "CONTINUE"),
+ /* fail the processing and stop */
+ FAIL(1, "FAIL");
+
+ /** an english description of the api--this is for debugging and can change */
+ public final String name;
+
+ /** the permanent and immutable id of an API--this can't change ever */
+ public final int id;
+
+ DeserializationHandlerResponse(final int id, final String name) {
+ this.id = id;
+ this.name = name;
+ }
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
new file mode 100644
index 00000000000..dde4b52b1dc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Deserialization handler that logs a deserialization exception and then
+ * signals the processing pipeline to continue processing more records.
+ */
+public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
+ private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
+
+ @Override
+ public DeserializationHandlerResponse handle(final ProcessorContext context,
+ final ConsumerRecord record,
+ final Exception exception) {
+
+ log.warn("Exception caught during Deserialization, " +
+ "taskId: {}, topic: {}, partition: {}, offset: {}",
+ context.taskId(), record.topic(), record.partition(), record.offset(),
+ exception);
+
+ return DeserializationHandlerResponse.CONTINUE;
+ }
+
+ @Override
+ public void configure(final Map configs) {
+ // ignore
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
new file mode 100644
index 00000000000..23557a35337
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * Deserialization handler that logs a deserialization exception and then
+ * signals the processing pipeline to stop processing more records and fail.
+ */
+public class LogAndFailExceptionHandler implements DeserializationExceptionHandler {
+ private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
+
+ @Override
+ public DeserializationHandlerResponse handle(final ProcessorContext context,
+ final ConsumerRecord record,
+ final Exception exception) {
+
+ log.warn("Exception caught during Deserialization, " +
+ "taskId: {}, topic: {}, partition: {}, offset: {}",
+ context.taskId(), record.topic(), record.partition(), record.offset(),
+ exception);
+
+ return DeserializationHandlerResponse.FAIL;
+ }
+
+ @Override
+ public void configure(final Map configs) {
+ // ignore
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 2540af0d1d6..38beb63664a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import java.io.IOException;
import java.util.HashMap;
@@ -29,31 +30,23 @@ import java.util.Set;
*/
public class GlobalStateUpdateTask implements GlobalStateMaintainer {
- private static class SourceNodeAndDeserializer {
- private final SourceNode sourceNode;
- private final RecordDeserializer deserializer;
-
- SourceNodeAndDeserializer(final SourceNode sourceNode,
- final RecordDeserializer deserializer) {
- this.sourceNode = sourceNode;
- this.deserializer = deserializer;
- }
- }
-
private final ProcessorTopology topology;
private final InternalProcessorContext processorContext;
private final Map offsets = new HashMap<>();
- private final Map deserializers = new HashMap<>();
+ private final Map deserializers = new HashMap<>();
private final GlobalStateManager stateMgr;
+ private final DeserializationExceptionHandler deserializationExceptionHandler;
public GlobalStateUpdateTask(final ProcessorTopology topology,
final InternalProcessorContext processorContext,
- final GlobalStateManager stateMgr) {
+ final GlobalStateManager stateMgr,
+ final DeserializationExceptionHandler deserializationExceptionHandler) {
this.topology = topology;
this.stateMgr = stateMgr;
this.processorContext = processorContext;
+ this.deserializationExceptionHandler = deserializationExceptionHandler;
}
@SuppressWarnings("unchecked")
@@ -63,7 +56,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
final SourceNode source = topology.source(sourceTopic);
- deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source)));
+ deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler));
}
initTopology();
processorContext.initialized();
@@ -74,17 +67,21 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
@SuppressWarnings("unchecked")
@Override
public void update(final ConsumerRecord record) {
- final SourceNodeAndDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
- final ConsumerRecord