Browse Source

MINOR: improve exception message for incompatible Serdes to actual key/value data types

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll, Guozhang Wang

Closes #2118 from mjsax/hotfixImproveSerdeTypeMissmatchError
pull/2122/head
Matthias J. Sax 8 years ago committed by Guozhang Wang
parent
commit
d8fa4006cb
  1. 12
      streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
  2. 128
      streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java

12
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java

@ -79,7 +79,19 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { @@ -79,7 +79,19 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
"Use a different TimestampExtractor to process this data.");
}
try {
collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
} catch (ClassCastException e) {
throw new StreamsException(
String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type " +
"(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
keySerializer.getClass().getName(),
valSerializer.getClass().getName(),
key.getClass().getName(),
value.getClass().getName()),
e);
}
}
@Override

128
streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java

@ -17,19 +17,17 @@ @@ -17,19 +17,17 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
import java.io.File;
import java.util.Map;
import java.util.Properties;
public class SinkNodeTest {
@ -37,108 +35,38 @@ public class SinkNodeTest { @@ -37,108 +35,38 @@ public class SinkNodeTest {
@SuppressWarnings("unchecked")
public void invalidInputRecordTimestampTest() {
final Serializer anySerializer = Serdes.Bytes().serializer();
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(null, null));
context.setTime(-1);
final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
sink.init(new MockProcessorContext());
sink.init(context);
sink.process(null, null);
}
private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
private final long invalidTimestamp = -1;
@Override
public String applicationId() {
return null;
}
@Override
public TaskId taskId() {
return null;
}
@Override
public Serde<?> keySerde() {
return null;
}
@Override
public Serde<?> valueSerde() {
return null;
}
@Override
public File stateDir() {
return null;
}
@Override
public StreamsMetrics metrics() {
return null;
}
@Override
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
}
@Override
public StateStore getStateStore(String name) {
return null;
}
@Override
public void schedule(long interval) {
}
@Override
public <K, V> void forward(K key, V value) {
}
@Override
public <K, V> void forward(K key, V value, int childIndex) {
}
@Override
public <K, V> void forward(K key, V value, String childName) {
}
@Override
public void commit() {
}
@Override
public String topic() {
return null;
}
@Override
public int partition() {
return 0;
}
@Override
public long offset() {
return 0;
}
@Test(expected = StreamsException.class)
@SuppressWarnings("unchecked")
public void shouldThrowStreamsExceptionOnKeyValyeTypeSerializerMissmatch() {
final Serializer anySerializer = Serdes.Bytes().serializer();
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
@Override
public long timestamp() {
return invalidTimestamp;
}
Properties config = new Properties();
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
context.setTime(0);
@Override
public Map<String, Object> appConfigs() {
return null;
}
final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
sink.init(context);
@Override
public Map<String, Object> appConfigsWithPrefix(String prefix) {
return null;
try {
sink.process("", "");
} catch (final StreamsException e) {
if (e.getCause() instanceof ClassCastException) {
throw e;
}
@Override
public RecordCollector recordCollector() {
return null;
throw new RuntimeException(e);
}
}

Loading…
Cancel
Save