diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 8ac373ba291..2f20cdb7480 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -79,7 +79,19 @@ public class SinkNode extends ProcessorNode { "Use a different TimestampExtractor to process this data."); } - collector.send(new ProducerRecord(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner); + try { + collector.send(new ProducerRecord(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner); + } catch (ClassCastException e) { + throw new StreamsException( + String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + + "(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " + + "provide correct Serdes via method parameters.", + keySerializer.getClass().getName(), + valSerializer.getClass().getName(), + key.getClass().getName(), + value.getClass().getName()), + e); + } } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 3b415174a93..51dc7d2c640 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -17,19 +17,17 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; -import java.io.File; -import java.util.Map; +import java.util.Properties; public class SinkNodeTest { @@ -37,108 +35,38 @@ public class SinkNodeTest { @SuppressWarnings("unchecked") public void invalidInputRecordTimestampTest() { final Serializer anySerializer = Serdes.Bytes().serializer(); + final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); + + final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(null, null)); + context.setTime(-1); final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null); - sink.init(new MockProcessorContext()); + sink.init(context); sink.process(null, null); } - private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier { - private final long invalidTimestamp = -1; - - @Override - public String applicationId() { - return null; - } - - @Override - public TaskId taskId() { - return null; - } - - @Override - public Serde keySerde() { - return null; - } - - @Override - public Serde valueSerde() { - return null; - } - - @Override - public File stateDir() { - return null; - } - - @Override - public StreamsMetrics metrics() { - return null; - } - - @Override - public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { - } - - @Override - public StateStore getStateStore(String name) { - return null; - } - - @Override - public void schedule(long interval) { - } - - @Override - public void forward(K key, V value) { - } - - @Override - public void forward(K key, V value, int childIndex) { - } - - @Override - public void forward(K key, V value, String childName) { - } - - @Override - public void commit() { - } - - @Override - public String topic() { - return null; - } - - @Override - public int partition() { - return 0; - } - - @Override - public long offset() { - return 0; - } - - @Override - public long timestamp() { - return invalidTimestamp; - } - - @Override - public Map appConfigs() { - return null; - } + @Test(expected = StreamsException.class) + @SuppressWarnings("unchecked") + public void shouldThrowStreamsExceptionOnKeyValyeTypeSerializerMissmatch() { + final Serializer anySerializer = Serdes.Bytes().serializer(); + final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); - @Override - public Map appConfigsWithPrefix(String prefix) { - return null; - } + Properties config = new Properties(); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(new MockProducer(true, anySerializer, anySerializer), null)); + context.setTime(0); - @Override - public RecordCollector recordCollector() { - return null; + final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null); + sink.init(context); + + try { + sink.process("", ""); + } catch (final StreamsException e) { + if (e.getCause() instanceof ClassCastException) { + throw e; + } + throw new RuntimeException(e); } }