Browse Source

KAFKA-5157; Options for handling corrupt data during deserialization

This is the implementation of KIP-161: https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>

Closes #3423 from enothereska/KAFKA-5157-deserialization-exceptions
pull/3504/merge
Eno Thereska 7 years ago committed by Damian Guy
parent
commit
a1f97c8dc4
  1. 18
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  2. 60
      streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
  3. 52
      streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
  4. 52
      streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
  5. 45
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
  6. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
  7. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
  8. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
  9. 34
      streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
  10. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  11. 77
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
  12. 5
      streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
  13. 60
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
  14. 6
      streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
  15. 3
      streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java

18
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -29,6 +29,8 @@ import org.apache.kafka.common.config.ConfigException; @@ -29,6 +29,8 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@ -146,6 +148,13 @@ public class StreamsConfig extends AbstractConfig { @@ -146,6 +148,13 @@ public class StreamsConfig extends AbstractConfig {
/** {@code connections.max.idle.ms} */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
/**
* {@code default.deserialization.exception.handler}
*/
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>DeserializationExceptionHandler</code> interface.";
/** {@code default key.serde} */
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>Serde</code> interface.";
@ -303,6 +312,11 @@ public class StreamsConfig extends AbstractConfig { @@ -303,6 +312,11 @@ public class StreamsConfig extends AbstractConfig {
"",
Importance.MEDIUM,
CommonClientConfigs.CLIENT_ID_DOC)
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Type.CLASS,
Serdes.ByteArraySerde.class.getName(),
@ -790,6 +804,10 @@ public class StreamsConfig extends AbstractConfig { @@ -790,6 +804,10 @@ public class StreamsConfig extends AbstractConfig {
return timestampExtractor;
}
public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
}
/**
* Override any client properties in the original configs with overrides
*

60
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java

@ -0,0 +1,60 @@ @@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* Interface that specifies how an exception from source node deserialization
* (e.g., reading from Kafka) should be handled.
*/
public interface DeserializationExceptionHandler extends Configurable {
/**
* Inspect a record and the exception received.
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
*/
DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception);
/**
* Enumeration that describes the response from the exception handler.
*/
enum DeserializationHandlerResponse {
/* continue with processing */
CONTINUE(0, "CONTINUE"),
/* fail the processing and stop */
FAIL(1, "FAIL");
/** an english description of the api--this is for debugging and can change */
public final String name;
/** the permanent and immutable id of an API--this can't change ever */
public final int id;
DeserializationHandlerResponse(final int id, final String name) {
this.id = id;
this.name = name;
}
}
}

52
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to continue processing more records.
*/
public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
}

52
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
return DeserializationHandlerResponse.FAIL;
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
}

45
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import java.io.IOException;
import java.util.HashMap;
@ -29,31 +30,23 @@ import java.util.Set; @@ -29,31 +30,23 @@ import java.util.Set;
*/
public class GlobalStateUpdateTask implements GlobalStateMaintainer {
private static class SourceNodeAndDeserializer {
private final SourceNode sourceNode;
private final RecordDeserializer deserializer;
SourceNodeAndDeserializer(final SourceNode sourceNode,
final RecordDeserializer deserializer) {
this.sourceNode = sourceNode;
this.deserializer = deserializer;
}
}
private final ProcessorTopology topology;
private final InternalProcessorContext processorContext;
private final Map<TopicPartition, Long> offsets = new HashMap<>();
private final Map<String, SourceNodeAndDeserializer> deserializers = new HashMap<>();
private final Map<String, SourceNodeRecordDeserializer> deserializers = new HashMap<>();
private final GlobalStateManager stateMgr;
private final DeserializationExceptionHandler deserializationExceptionHandler;
public GlobalStateUpdateTask(final ProcessorTopology topology,
final InternalProcessorContext processorContext,
final GlobalStateManager stateMgr) {
final GlobalStateManager stateMgr,
final DeserializationExceptionHandler deserializationExceptionHandler) {
this.topology = topology;
this.stateMgr = stateMgr;
this.processorContext = processorContext;
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
@SuppressWarnings("unchecked")
@ -63,7 +56,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { @@ -63,7 +56,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
final SourceNode source = topology.source(sourceTopic);
deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source)));
deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler));
}
initTopology();
processorContext.initialized();
@ -74,17 +67,21 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { @@ -74,17 +67,21 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
@SuppressWarnings("unchecked")
@Override
public void update(final ConsumerRecord<byte[], byte[]> record) {
final SourceNodeAndDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserializer.deserialize(record);
final ProcessorRecordContext recordContext =
final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record);
if (deserialized != null) {
final ProcessorRecordContext recordContext =
new ProcessorRecordContext(deserialized.timestamp(),
deserialized.offset(),
deserialized.partition(),
deserialized.topic());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode);
sourceNodeAndDeserializer.sourceNode.process(deserialized.key(), deserialized.value());
offsets.put(new TopicPartition(record.topic(), record.partition()), deserialized.offset() + 1);
deserialized.offset(),
deserialized.partition(),
deserialized.topic());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), deserialized.value());
}
offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
}
public void flushState() {

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java

@ -277,7 +277,8 @@ public class GlobalStreamThread extends Thread { @@ -277,7 +277,8 @@ public class GlobalStreamThread extends Thread {
stateMgr,
streamsMetrics,
cache),
stateMgr),
stateMgr,
config.defaultDeserializationExceptionHandler()),
time,
config.getLong(StreamsConfig.POLL_MS_CONFIG),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

@ -168,6 +168,7 @@ public class ProcessorNode<K, V> { @@ -168,6 +168,7 @@ public class ProcessorNode<K, V> {
final Sensor nodeProcessTimeSensor;
final Sensor nodePunctuateTimeSensor;
final Sensor sourceNodeForwardSensor;
final Sensor sourceNodeSkippedDueToDeserializationError;
final Sensor nodeCreationSensor;
final Sensor nodeDestructionSensor;
@ -186,6 +187,7 @@ public class ProcessorNode<K, V> { @@ -186,6 +187,7 @@ public class ProcessorNode<K, V> {
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
}
public void removeAllSensors() {
@ -194,6 +196,7 @@ public class ProcessorNode<K, V> { @@ -194,6 +196,7 @@ public class ProcessorNode<K, V> {
metrics.removeSensor(sourceNodeForwardSensor);
metrics.removeSensor(nodeCreationSensor);
metrics.removeSensor(nodeDestructionSensor);
metrics.removeSensor(sourceNodeSkippedDueToDeserializationError);
}
}
}

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals; @@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,19 +41,25 @@ public class RecordQueue { @@ -39,19 +41,25 @@ public class RecordQueue {
private final TopicPartition partition;
private final ArrayDeque<StampedRecord> fifoQueue;
private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
private final RecordDeserializer recordDeserializer;
private final SourceNodeRecordDeserializer recordDeserializer;
private final DeserializationExceptionHandler deserializationExceptionHandler;
private final ProcessorContext processorContext;
private long partitionTime = TimestampTracker.NOT_KNOWN;
RecordQueue(final TopicPartition partition,
final SourceNode source,
final TimestampExtractor timestampExtractor) {
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessorContext processorContext) {
this.partition = partition;
this.source = source;
this.timestampExtractor = timestampExtractor;
this.fifoQueue = new ArrayDeque<>();
this.timeTracker = new MinTimestampTracker<>();
this.recordDeserializer = new SourceNodeRecordDeserializer(source);
this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
this.deserializationExceptionHandler = deserializationExceptionHandler;
this.processorContext = processorContext;
}
@ -81,7 +89,12 @@ public class RecordQueue { @@ -81,7 +89,12 @@ public class RecordQueue {
*/
public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
if (record == null) {
continue;
}
long timestamp = timestampExtractor.extract(record, timeTracker.get());
log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);

34
streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java

@ -18,15 +18,21 @@ package org.apache.kafka.streams.processor.internals; @@ -18,15 +18,21 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import static java.lang.String.format;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
class SourceNodeRecordDeserializer implements RecordDeserializer {
private final SourceNode sourceNode;
private final DeserializationExceptionHandler deserializationExceptionHandler;
SourceNodeRecordDeserializer(final SourceNode sourceNode) {
SourceNodeRecordDeserializer(final SourceNode sourceNode,
final DeserializationExceptionHandler deserializationExceptionHandler) {
this.sourceNode = sourceNode;
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
@Override
@ -54,4 +60,30 @@ class SourceNodeRecordDeserializer implements RecordDeserializer { @@ -54,4 +60,30 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
rawRecord.serializedValueSize(), key, value);
}
public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
ConsumerRecord<byte[], byte[]> rawRecord) {
// catch and process if we have a deserialization handler
try {
return deserialize(rawRecord);
} catch (Exception e) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response =
deserializationExceptionHandler.handle(processorContext, rawRecord, e);
if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
throw new StreamsException("Deserialization exception handler is set to fail upon" +
" a deserialization error. If you would rather have the streaming pipeline" +
" continue after a deserialization error, please set the " +
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
e);
} else {
sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
}
}
return null;
}
public SourceNode sourceNode() {
return sourceNode;
}
}

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -28,6 +28,7 @@ import org.apache.kafka.common.metrics.Sensor; @@ -28,6 +28,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -122,15 +123,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -122,15 +123,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// to corresponding source nodes in the processor topology
final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
for (final TopicPartition partition : partitions) {
final SourceNode source = topology.source(partition.topic());
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor);
partitionQueues.put(partition, queue);
}
partitionGroup = new PartitionGroup(partitionQueues);
// initialize the consumed offset cache
consumedOffsets = new HashMap<>();
@ -140,6 +132,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -140,6 +132,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// initialize the topology with its own context
processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
for (final TopicPartition partition : partitions) {
final SourceNode source = topology.source(partition.topic());
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext);
partitionQueues.put(partition, queue);
}
partitionGroup = new PartitionGroup(partitionQueues);
this.time = time;
log.debug("{} Initializing", logPrefix);
initializeStateStores();

77
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java

@ -18,10 +18,15 @@ package org.apache.kafka.streams.processor.internals; @@ -18,10 +18,15 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.test.GlobalStateManagerStub;
import org.apache.kafka.test.MockProcessorNode;
@ -41,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo; @@ -41,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class GlobalStateTaskTest {
@ -53,6 +59,7 @@ public class GlobalStateTaskTest { @@ -53,6 +59,7 @@ public class GlobalStateTaskTest {
private TopicPartition t2;
private MockSourceNode sourceOne;
private MockSourceNode sourceTwo;
private ProcessorTopology topology;
@Before
public void before() {
@ -70,12 +77,12 @@ public class GlobalStateTaskTest { @@ -70,12 +77,12 @@ public class GlobalStateTaskTest {
final Map<String, String> storeToTopic = new HashMap<>();
storeToTopic.put("t1-store", "t1");
storeToTopic.put("t2-store", "t2");
final ProcessorTopology topology = new ProcessorTopology(processorNodes,
sourceByTopics,
Collections.<String, SinkNode>emptyMap(),
Collections.<StateStore>emptyList(),
storeToTopic,
Collections.<StateStore>emptyList());
topology = new ProcessorTopology(processorNodes,
sourceByTopics,
Collections.<String, SinkNode>emptyMap(),
Collections.<StateStore>emptyList(),
storeToTopic,
Collections.<StateStore>emptyList());
context = new NoOpProcessorContext();
t1 = new TopicPartition("t1", 1);
@ -84,7 +91,7 @@ public class GlobalStateTaskTest { @@ -84,7 +91,7 @@ public class GlobalStateTaskTest {
offsets.put(t1, 50L);
offsets.put(t2, 100L);
stateMgr = new GlobalStateManagerStub(storeNames, offsets);
globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr);
globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler());
}
@Test
@ -129,6 +136,62 @@ public class GlobalStateTaskTest { @@ -129,6 +136,62 @@ public class GlobalStateTaskTest {
assertEquals(0, sourceOne.numReceived);
}
private void maybeDeserialize(final GlobalStateUpdateTask globalStateTask,
final byte[] key,
final byte[] recordValue,
boolean failExpected) {
final ConsumerRecord record = new ConsumerRecord<>("t2", 1, 1,
0L, TimestampType.CREATE_TIME, 0L, 0, 0,
key, recordValue);
globalStateTask.initialize();
try {
globalStateTask.update(record);
if (failExpected) {
fail("Should have failed to deserialize.");
}
} catch (StreamsException e) {
if (!failExpected) {
fail("Shouldn't have failed to deserialize.");
}
}
}
@Test
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception {
final byte[] key = new LongSerializer().serialize("t2", 1L);
final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
maybeDeserialize(globalStateTask, key, recordValue, true);
}
@Test
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
final byte[] key = new IntegerSerializer().serialize("t2", 1);
final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
maybeDeserialize(globalStateTask, key, recordValue, true);
}
@Test
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
new LogAndContinueExceptionHandler());
final byte[] key = new LongSerializer().serialize("t2", 1L);
final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
maybeDeserialize(globalStateTask2, key, recordValue, false);
}
@Test
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
new LogAndContinueExceptionHandler());
final byte[] key = new IntegerSerializer().serialize("t2", 1);
final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
maybeDeserialize(globalStateTask2, key, recordValue, false);
}
@Test
public void shouldCloseStateManagerWithOffsets() throws Exception {

5
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
@ -40,8 +41,8 @@ public class PartitionGroupTest { @@ -40,8 +41,8 @@ public class PartitionGroupTest {
private final String[] topics = {"topic"};
private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor);
private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor);
private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);

60
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

@ -27,12 +27,19 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -27,12 +27,19 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
@ -44,13 +51,30 @@ public class RecordQueueTest { @@ -44,13 +51,30 @@ public class RecordQueueTest {
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final String[] topics = {"topic"};
final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new RecordCollectorImpl(null, null));
private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
timestampExtractor);
mockSourceNodeWithMetrics,
timestampExtractor, new LogAndFailExceptionHandler(), context);
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(topics[0], 1),
mockSourceNodeWithMetrics,
timestampExtractor, new LogAndContinueExceptionHandler(), context);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
@Before
public void before() {
mockSourceNodeWithMetrics.init(context);
}
@After
public void after() {
mockSourceNodeWithMetrics.close();
}
@Test
public void testTimeTracking() {
@ -140,14 +164,38 @@ public class RecordQueueTest { @@ -140,14 +164,38 @@ public class RecordQueueTest {
queue.addRawRecords(records);
}
@Test
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
queueThatSkipsDeserializeErrors.addRawRecords(records);
assertEquals(0, queueThatSkipsDeserializeErrors.size());
}
@Test
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() throws Exception {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
queueThatSkipsDeserializeErrors.addRawRecords(records);
assertEquals(0, queueThatSkipsDeserializeErrors.size());
}
@Test(expected = StreamsException.class)
public void shouldThrowOnNegativeTimestamp() {
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new FailOnInvalidTimestamp());
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new FailOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
null);
queue.addRawRecords(records);
}
@ -158,7 +206,9 @@ public class RecordQueueTest { @@ -158,7 +206,9 @@ public class RecordQueueTest {
final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new LogAndSkipOnInvalidTimestamp());
new LogAndSkipOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
null);
queue.addRawRecords(records);
assertEquals(0, queue.size());

6
streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java

@ -43,21 +43,21 @@ public class SourceNodeRecordDeserializerTest { @@ -43,21 +43,21 @@ public class SourceNodeRecordDeserializerTest {
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() throws Exception {
final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
new TheSourceNode(true, false));
new TheSourceNode(true, false), null);
recordDeserializer.deserialize(rawRecord);
}
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() throws Exception {
final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
new TheSourceNode(false, true));
new TheSourceNode(false, true), null);
recordDeserializer.deserialize(rawRecord);
}
@Test
public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() throws Exception {
final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
new TheSourceNode(false, false, "key", "value"));
new TheSourceNode(false, false, "key", "value"), null);
final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
assertEquals(rawRecord.topic(), record.topic());
assertEquals(rawRecord.partition(), record.partition());

3
streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java

@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.MockTime; @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
@ -205,7 +206,7 @@ public class ProcessorTopologyTestDriver { @@ -205,7 +206,7 @@ public class ProcessorTopologyTestDriver {
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
globalStateTask = new GlobalStateUpdateTask(globalTopology,
new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
stateManager
stateManager, new LogAndContinueExceptionHandler()
);
globalStateTask.initialize();
}

Loading…
Cancel
Save