From 517503db2616531b08ee4d08d39c0e1c0bd19e97 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 6 Jan 2015 12:10:04 -0800 Subject: [PATCH] kafka-1797; (delta follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 91c672d3a78..a61c56c01c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -76,6 +76,7 @@ public class KafkaProducer implements Producer { private final Time time; private final Serializer keySerializer; private final Serializer valueSerializer; + private final ProducerConfig producerConfig; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -152,6 +153,7 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { log.trace("Starting the Kafka producer"); + this.producerConfig = config; this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), @@ -307,14 +309,16 @@ public class KafkaProducer implements Producer { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + - " to the one specified in key.serializer"); + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + + " specified in key.serializer"); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + - " to the one specified in value.serializer"); + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + + " specified in value.serializer"); } ProducerRecord serializedRecord = new ProducerRecord(record.topic(), record.partition(), serializedKey, serializedValue); int partition = partitioner.partition(serializedRecord, metadata.fetch());