From 92d1d4cd31e1045f0e000e8d2d777c73f7529743 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 17 Dec 2014 16:29:09 -0800 Subject: [PATCH] kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede --- .../consumer/ByteArrayDeserializer.java | 34 +++++ .../kafka/clients/consumer/Consumer.java | 4 +- .../clients/consumer/ConsumerConfig.java | 14 +- .../consumer/ConsumerRebalanceCallback.java | 4 +- .../clients/consumer/ConsumerRecord.java | 16 +-- .../clients/consumer/ConsumerRecords.java | 14 +- .../kafka/clients/consumer/Deserializer.java | 38 ++++++ .../kafka/clients/consumer/KafkaConsumer.java | 124 +++++++++++------- .../kafka/clients/consumer/MockConsumer.java | 6 +- .../clients/producer/ByteArraySerializer.java | 34 +++++ .../kafka/clients/producer/KafkaProducer.java | 76 +++++++++-- .../kafka/clients/producer/MockProducer.java | 14 +- .../kafka/clients/producer/Producer.java | 6 +- .../clients/producer/ProducerConfig.java | 15 ++- .../clients/producer/ProducerRecord.java | 20 +-- .../kafka/clients/producer/Serializer.java | 38 ++++++ .../producer/internals/Partitioner.java | 2 +- .../clients/tools/ProducerPerformance.java | 4 +- .../errors/DeserializationException.java | 47 +++++++ .../common/errors/SerializationException.java | 46 +++++++ .../clients/producer/MockProducerTest.java | 6 +- .../clients/producer/PartitionerTest.java | 12 +- .../scala/kafka/producer/BaseProducer.scala | 4 +- .../kafka/producer/KafkaLog4jAppender.scala | 6 +- .../main/scala/kafka/tools/MirrorMaker.scala | 3 +- .../scala/kafka/tools/ReplayLogProducer.scala | 4 +- .../kafka/tools/TestEndToEndLatency.scala | 4 +- .../scala/kafka/tools/TestLogCleaning.scala | 6 +- .../kafka/api/ProducerCompressionTest.scala | 4 +- .../api/ProducerFailureHandlingTest.scala | 32 ++--- .../kafka/api/ProducerSendTest.scala | 16 +-- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- 32 files changed, 499 insertions(+), 158 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java new file mode 100644 index 00000000000..514cbd2c27a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import java.util.Map; + +public class ByteArrayDeserializer implements Deserializer { + + @Override + public void configure(Map configs) { + // nothing to do + } + + @Override + public byte[] deserialize(String topic, byte[] data, boolean isKey) { + return data; + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 227f5646ee7..1bce5018527 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition; * @see KafkaConsumer * @see MockConsumer */ -public interface Consumer extends Closeable { +public interface Consumer extends Closeable { /** * Incrementally subscribe to the given list of topics. This API is mutually exclusive to @@ -63,7 +63,7 @@ public interface Consumer extends Closeable { * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. * If no data is available for timeout ms, returns an empty list */ - public Map poll(long timeout); + public Map> poll(long timeout); /** * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 46efc0c8483..1d64f08762b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -151,6 +151,14 @@ public class ConsumerConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** key.deserializer */ + public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; + private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the Deserializer interface."; + + /** value.deserializer */ + public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; + private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer interface."; + static { /* TODO: add config docs */ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") @@ -176,8 +184,10 @@ public class ConsumerConfig extends AbstractConfig { Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); - + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) + .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) + .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC); + } ConsumerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java index f026ae41ce8..e4cf7d1cfa0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java @@ -35,7 +35,7 @@ public interface ConsumerRebalanceCallback { * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} * @param partitions The list of partitions that are assigned to the consumer after rebalance */ - public void onPartitionsAssigned(Consumer consumer, Collection partitions); + public void onPartitionsAssigned(Consumer consumer, Collection partitions); /** * A callback method the user can implement to provide handling of offset commits to a customized store on the @@ -46,5 +46,5 @@ public interface ConsumerRebalanceCallback { * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} * @param partitions The list of partitions that were assigned to the consumer on the last rebalance */ - public void onPartitionsRevoked(Consumer consumer, Collection partitions); + public void onPartitionsRevoked(Consumer consumer, Collection partitions); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 436d8a47916..16af70a5de5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -18,10 +18,10 @@ import org.apache.kafka.common.TopicPartition; * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the * record is being received and an offset that points to the record in a Kafka partition. */ -public final class ConsumerRecord { +public final class ConsumerRecord { private final TopicPartition partition; - private final byte[] key; - private final byte[] value; + private final K key; + private final V value; private final long offset; private volatile Exception error; @@ -34,7 +34,7 @@ public final class ConsumerRecord { * @param value The record contents * @param offset The offset of this record in the corresponding Kafka partition */ - public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) { + public ConsumerRecord(String topic, int partitionId, K key, V value, long offset) { this(topic, partitionId, key, value, offset, null); } @@ -46,7 +46,7 @@ public final class ConsumerRecord { * @param value The record contents * @param offset The offset of this record in the corresponding Kafka partition */ - public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) { + public ConsumerRecord(String topic, int partitionId, V value, long offset) { this(topic, partitionId, null, value, offset); } @@ -60,7 +60,7 @@ public final class ConsumerRecord { this(topic, partitionId, null, null, -1L, error); } - private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) { + private ConsumerRecord(String topic, int partitionId, K key, V value, long offset, Exception error) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.partition = new TopicPartition(topic, partitionId); @@ -95,7 +95,7 @@ public final class ConsumerRecord { * The key (or null if no key is specified) * @throws Exception The exception thrown while fetching this record. */ - public byte[] key() throws Exception { + public K key() throws Exception { if (this.error != null) throw this.error; return key; @@ -105,7 +105,7 @@ public final class ConsumerRecord { * The value * @throws Exception The exception thrown while fetching this record. */ - public byte[] value() throws Exception { + public V value() throws Exception { if (this.error != null) throw this.error; return value; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 2ecfc8aaea9..bdf4b26942d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -21,12 +21,12 @@ import java.util.Map.Entry; * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a * {@link Consumer#poll(long)} operation. */ -public class ConsumerRecords { +public class ConsumerRecords { private final String topic; - private final Map> recordsPerPartition; + private final Map>> recordsPerPartition; - public ConsumerRecords(String topic, Map> records) { + public ConsumerRecords(String topic, Map>> records) { this.topic = topic; this.recordsPerPartition = records; } @@ -36,16 +36,16 @@ public class ConsumerRecords { * specified, returns records for all partitions * @return The list of {@link ConsumerRecord}s associated with the given partitions. */ - public List records(int... partitions) { - List recordsToReturn = new ArrayList(); + public List> records(int... partitions) { + List> recordsToReturn = new ArrayList>(); if(partitions.length == 0) { // return records for all partitions - for(Entry> record : recordsPerPartition.entrySet()) { + for(Entry>> record : recordsPerPartition.entrySet()) { recordsToReturn.addAll(record.getValue()); } } else { for(int partition : partitions) { - List recordsForThisPartition = recordsPerPartition.get(partition); + List> recordsForThisPartition = recordsPerPartition.get(partition); recordsToReturn.addAll(recordsForThisPartition); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java new file mode 100644 index 00000000000..fa857d4debb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.Configurable; + +/** + * + * @param Type to be deserialized into. + * + * A class that implements this interface is expected to have a constructor with no parameter. + */ +public interface Deserializer extends Configurable { + /** + * + * @param topic Topic associated with the data + * @param data Serialized bytes + * @param isKey Is data for key or value + * @return deserialized typed data + */ + public T deserialize(String topic, byte[] data, boolean isKey); + + /** + * Close this deserializer + */ + public void close(); +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fe93afa24fc..a43b1600c70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -12,19 +12,6 @@ */ package org.apache.kafka.clients.consumer; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; @@ -36,6 +23,9 @@ import org.apache.kafka.common.utils.SystemTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.*; + /** * A Kafka client that consumes records from a Kafka cluster. *

@@ -50,12 +40,12 @@ import org.slf4j.LoggerFactory; * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. *

  * {@code
- * private Map process(Map records) {
+ * private Map process(Map records) {
  *     Map processedOffsets = new HashMap();
- *     for(Entry recordMetadata : records.entrySet()) {
- *          List recordsPerTopic = recordMetadata.getValue().records();
+ *     for(Entry> recordMetadata : records.entrySet()) {
+ *          List> recordsPerTopic = recordMetadata.getValue().records();
  *          for(int i = 0;i < recordsPerTopic.size();i++) {
- *               ConsumerRecord record = recordsPerTopic.get(i);
+ *               ConsumerRecord record = recordsPerTopic.get(i);
  *               // process record
  *               try {
  *               	processedOffsets.put(record.topicAndpartition(), record.offset());
@@ -80,11 +70,11 @@ import org.slf4j.LoggerFactory;
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * consumer.subscribe("foo", "bar");
  * boolean isRunning = true;
  * while(isRunning) {
- *   Map records = consumer.poll(100);
+ *   Map> records = consumer.poll(100);
  *   process(records);
  * }
  * consumer.close();
@@ -102,14 +92,14 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * consumer.subscribe("foo", "bar");
  * int commitInterval = 100;
  * int numRecords = 0;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     try {
  *         Map lastConsumedOffsets = process(records);
  *         consumedOffsets.putAll(lastConsumedOffsets);
@@ -156,16 +146,17 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer consumer = new KafkaConsumer(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
  *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
  *                                                    Map latestCommittedOffsets = consumer.committed(partitions);
  *                                                    if(rewindOffsets)
  *                                                        Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
  *                                                    consumer.seek(newOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
  *                                                    consumer.commit(true);
  *                                                }
  *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
@@ -183,7 +174,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -211,13 +202,14 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer consumer = new KafkaConsumer(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
  *                                                    Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
  *                                                    consumer.seek(lastCommittedOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
  *                                                    Map offsets = getLastConsumedOffsets(partitions);
  *                                                    commitOffsetsToCustomStore(offsets); 
  *                                                }
@@ -234,7 +226,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -259,7 +251,7 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -276,7 +268,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     for(TopicPartition partition : partitions) {
@@ -298,7 +290,7 @@ import org.slf4j.LoggerFactory;
  * {@code  
  * Properties props = new Properties();
  * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -314,7 +306,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     // commit offsets for partitions 0,1 for topic foo to custom store
@@ -331,7 +323,7 @@ import org.slf4j.LoggerFactory;
  * }
  * 
*/ -public class KafkaConsumer implements Consumer { +public class KafkaConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); @@ -340,7 +332,9 @@ public class KafkaConsumer implements Consumer { private final Metrics metrics; private final Set subscribedTopics; private final Set subscribedPartitions; - + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; + /** * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be @@ -351,7 +345,7 @@ public class KafkaConsumer implements Consumer { * @param configs The consumer configs */ public KafkaConsumer(Map configs) { - this(new ConsumerConfig(configs), null); + this(new ConsumerConfig(configs), null, null, null); } /** @@ -364,7 +358,24 @@ public class KafkaConsumer implements Consumer { * every rebalance operation. */ public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(configs), callback); + this(new ConsumerConfig(configs), callback, null, null); + } + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback} + * implementation, a key and a value {@link Deserializer}. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param configs The consumer configs + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't + * be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + */ + public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { + this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer); } /** @@ -372,12 +383,12 @@ public class KafkaConsumer implements Consumer { * Valid configuration strings are documented at {@link ConsumerConfig} */ public KafkaConsumer(Properties properties) { - this(new ConsumerConfig(properties), null); + this(new ConsumerConfig(properties), null, null, null); } /** * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a - * {@link ConsumerRebalanceCallback} implementation. + * {@link ConsumerRebalanceCallback} implementation. *

* Valid configuration strings are documented at {@link ConsumerConfig} * @param properties The consumer configuration properties @@ -385,14 +396,27 @@ public class KafkaConsumer implements Consumer { * every rebalance operation. */ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(properties), callback); + this(new ConsumerConfig(properties), callback, null, null); } - private KafkaConsumer(ConsumerConfig config) { - this(config, null); + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a + * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param properties The consumer configuration properties + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't + * be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + */ + public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { + this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer); } - private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) { + private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { log.trace("Starting the Kafka consumer"); subscribedTopics = new HashSet(); subscribedPartitions = new HashSet(); @@ -402,6 +426,18 @@ public class KafkaConsumer implements Consumer { this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + if (keyDeserializer == null) + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + else + this.keyDeserializer = keyDeserializer; + if (valueDeserializer == null) + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + else + this.valueDeserializer = valueDeserializer; + config.logUnused(); log.debug("Kafka consumer started"); } @@ -488,7 +524,7 @@ public class KafkaConsumer implements Consumer { * @return map of topic to records since the last fetch for the subscribed list of topics and partitions */ @Override - public Map poll(long timeout) { + public Map> poll(long timeout) { // TODO Auto-generated method stub return null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c3aad3b4d6b..8cab16c0a0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition; * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it * needs to communicate with. Failure to close the consumer after use will leak these resources. */ -public class MockConsumer implements Consumer { +public class MockConsumer implements Consumer { private final Set subscribedPartitions; private final Set subscribedTopics; @@ -90,10 +90,10 @@ public class MockConsumer implements Consumer { } @Override - public Map poll(long timeout) { + public Map> poll(long timeout) { // hand out one dummy record, 1 per topic Map> records = new HashMap>(); - Map recordMetadata = new HashMap(); + Map> recordMetadata = new HashMap>(); for(TopicPartition partition : subscribedPartitions) { // get the last consumed offset long messageSequence = consumedOffsets.get(partition); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java new file mode 100644 index 00000000000..9005b74a328 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.producer; + +import java.util.Map; + +public class ByteArraySerializer implements Serializer { + + @Override + public void configure(Map configs) { + // nothing to do + } + + @Override + public byte[] serialize(String topic, byte[] data, boolean isKey) { + return data; + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 32f444ebbd2..f61efb35db7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory; * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it * needs to communicate with. Failure to close the producer after use will leak these resources. */ -public class KafkaProducer implements Producer { +public class KafkaProducer implements Producer { private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); @@ -75,26 +75,59 @@ public class KafkaProducer implements Producer { private final CompressionType compressionType; private final Sensor errors; private final Time time; + private final Serializer keySerializer; + private final Serializer valueSerializer; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the * string "42" or the integer 42). + * @param configs The producer configs + * */ public KafkaProducer(Map configs) { - this(new ProducerConfig(configs)); + this(new ProducerConfig(configs), null, null); + } + + /** + * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. + * Valid configuration strings are documented here. + * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept + * either the string "42" or the integer 42). + * @param configs The producer configs + * @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be + * called in the producer when the serializer is passed in directly. + * @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't + * be called in the producer when the serializer is passed in directly. + */ + public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) { + this(new ProducerConfig(configs), keySerializer, valueSerializer); } /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. + * @param properties The producer configs */ public KafkaProducer(Properties properties) { - this(new ProducerConfig(properties)); + this(new ProducerConfig(properties), null, null); } - private KafkaProducer(ProducerConfig config) { + /** + * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. + * Valid configuration strings are documented here. + * @param properties The producer configs + * @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be + * called in the producer when the serializer is passed in directly. + * @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't + * be called in the producer when the serializer is passed in directly. + */ + public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) { + this(new ProducerConfig(properties), keySerializer, valueSerializer); + } + + private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { log.trace("Starting the Kafka producer"); this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) @@ -145,6 +178,17 @@ public class KafkaProducer implements Producer { this.errors = this.metrics.sensor("errors"); + if (keySerializer == null) + this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + Serializer.class); + else + this.keySerializer = keySerializer; + if (valueSerializer == null) + this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + Serializer.class); + else + this.valueSerializer = valueSerializer; + config.logUnused(); log.debug("Kafka producer started"); } @@ -159,9 +203,10 @@ public class KafkaProducer implements Producer { /** * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} + * @param record The record to be sent */ @Override - public Future send(ProducerRecord record) { + public Future send(ProducerRecord record) { return send(record, null); } @@ -183,14 +228,14 @@ public class KafkaProducer implements Producer { * If you want to simulate a simple blocking call you can do the following: * *

-     *   producer.send(new ProducerRecord("the-topic", "key, "value")).get();
+     *   producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
      * 
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * *

-     *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
+     *   ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
      *   producer.send(myRecord,
      *                 new Callback() {
      *                     public void onCompletion(RecordMetadata metadata, Exception e) {
@@ -205,8 +250,8 @@ public class KafkaProducer implements Producer {
      * following example callback1 is guaranteed to execute before callback2:
      * 
      * 
-     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
-     * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+     * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
      * 
*

* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or @@ -226,16 +271,19 @@ public class KafkaProducer implements Producer { * indicates no callback) */ @Override - public Future send(ProducerRecord record, Callback callback) { + public Future send(ProducerRecord record, Callback callback) { try { // first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); - int partition = partitioner.partition(record, metadata.fetch()); - int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); + byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true); + byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false); + ProducerRecord serializedRecord = new ProducerRecord(record.topic(), record.partition(), serializedKey, serializedValue); + int partition = partitioner.partition(serializedRecord, metadata.fetch()); + int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); @@ -324,6 +372,8 @@ public class KafkaProducer implements Producer { throw new KafkaException(e); } this.metrics.close(); + this.keySerializer.close(); + this.valueSerializer.close(); log.debug("The Kafka producer has closed."); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index c0f1d57e0fe..34624c3b7a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -40,11 +40,11 @@ import org.apache.kafka.common.TopicPartition; * By default this mock will synchronously complete each send call successfully. However it can be configured to allow * the user to control the completion of the call and supply an optional error for the producer to throw. */ -public class MockProducer implements Producer { +public class MockProducer implements Producer { private final Cluster cluster; private final Partitioner partitioner = new Partitioner(); - private final List sent; + private final List> sent; private final Deque completions; private boolean autoComplete; private Map offsets; @@ -62,7 +62,7 @@ public class MockProducer implements Producer { this.cluster = cluster; this.autoComplete = autoComplete; this.offsets = new HashMap(); - this.sent = new ArrayList(); + this.sent = new ArrayList>(); this.completions = new ArrayDeque(); } @@ -90,7 +90,7 @@ public class MockProducer implements Producer { * @see #history() */ @Override - public synchronized Future send(ProducerRecord record) { + public synchronized Future send(ProducerRecord record) { return send(record, null); } @@ -100,7 +100,7 @@ public class MockProducer implements Producer { * @see #history() */ @Override - public synchronized Future send(ProducerRecord record, Callback callback) { + public synchronized Future send(ProducerRecord record, Callback callback) { int partition = 0; if (this.cluster.partitionsForTopic(record.topic()) != null) partition = partitioner.partition(record, this.cluster); @@ -147,8 +147,8 @@ public class MockProducer implements Producer { /** * Get the list of sent records since the last call to {@link #clear()} */ - public synchronized List history() { - return new ArrayList(this.sent); + public synchronized List> history() { + return new ArrayList>(this.sent); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 36e83984160..5baa6062bd9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -31,7 +31,7 @@ import org.apache.kafka.common.PartitionInfo; * @see KafkaProducer * @see MockProducer */ -public interface Producer extends Closeable { +public interface Producer extends Closeable { /** * Send the given record asynchronously and return a future which will eventually contain the response information. @@ -39,12 +39,12 @@ public interface Producer extends Closeable { * @param record The record to send * @return A future which will eventually contain the response information */ - public Future send(ProducerRecord record); + public Future send(ProducerRecord record); /** * Send a record and invoke the given callback when the record has been acknowledged by the server */ - public Future send(ProducerRecord record, Callback callback); + public Future send(ProducerRecord record, Callback callback); /** * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 72d3ddd0c29..a893d88c2f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -17,7 +17,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import java.util.Arrays; -import java.util.List; import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; @@ -175,6 +174,14 @@ public class ProducerConfig extends AbstractConfig { + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" + " message re-ordering due to retries (i.e., if retries are enabled)."; + /** key.serializer */ + public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; + private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the Serializer interface."; + + /** value.serializer */ + public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; + private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -182,7 +189,7 @@ public class ProducerConfig extends AbstractConfig { .define(ACKS_CONFIG, Type.STRING, "1", - in(Arrays.asList("all","-1", "0", "1")), + in(Arrays.asList("all", "-1", "0", "1")), Importance.HIGH, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) @@ -221,7 +228,9 @@ public class ProducerConfig extends AbstractConfig { 5, atLeast(1), Importance.LOW, - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC); + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index c3181b368b6..065d4e6c6a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -20,12 +20,12 @@ package org.apache.kafka.clients.producer; * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is * present a partition will be assigned in a round-robin fashion. */ -public final class ProducerRecord { +public final class ProducerRecord { private final String topic; private final Integer partition; - private final byte[] key; - private final byte[] value; + private final K key; + private final V value; /** * Creates a record to be sent to a specified topic and partition @@ -35,7 +35,7 @@ public final class ProducerRecord { * @param key The key that will be included in the record * @param value The record contents */ - public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) { + public ProducerRecord(String topic, Integer partition, K key, V value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.topic = topic; @@ -51,7 +51,7 @@ public final class ProducerRecord { * @param key The key that will be included in the record * @param value The record contents */ - public ProducerRecord(String topic, byte[] key, byte[] value) { + public ProducerRecord(String topic, K key, V value) { this(topic, null, key, value); } @@ -61,7 +61,7 @@ public final class ProducerRecord { * @param topic The topic this record should be sent to * @param value The record contents */ - public ProducerRecord(String topic, byte[] value) { + public ProducerRecord(String topic, V value) { this(topic, null, value); } @@ -75,14 +75,14 @@ public final class ProducerRecord { /** * The key (or null if no key is specified) */ - public byte[] key() { + public K key() { return key; } /** * @return The value */ - public byte[] value() { + public V value() { return value; } @@ -95,8 +95,8 @@ public final class ProducerRecord { @Override public String toString() { - String key = this.key == null ? "null" : ("byte[" + this.key.length + "]"); - String value = this.value == null ? "null" : ("byte[" + this.value.length + "]"); + String key = this.key == null ? "null" : this.key.toString(); + String value = this.value == null ? "null" : this.value.toString(); return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java new file mode 100644 index 00000000000..03786835083 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.Configurable; + +/** + * + * @param Type to be serialized from. + * + * A class that implements this interface is expected to have a constructor with no parameter. + */ +public interface Serializer extends Configurable { + /** + * + * @param topic Topic associated with data + * @param data Typed data + * @param isKey Is data for key or value + * @return bytes of the serialized data + */ + public byte[] serialize(String topic, T data, boolean isKey); + + /** + * Close this serializer + */ + public void close(); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index 40e8234f877..483899d2e69 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -43,7 +43,7 @@ public class Partitioner { * @param record The record being sent * @param cluster The current cluster metadata */ - public int partition(ProducerRecord record, Cluster cluster) { + public int partition(ProducerRecord record, Cluster cluster) { List partitions = cluster.partitionsForTopic(record.topic()); int numPartitions = partitions.size(); if (record.partition() != null) { diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 28175fb71ed..1b828007975 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -46,12 +46,12 @@ public class ProducerPerformance { throw new IllegalArgumentException("Invalid property: " + args[i]); props.put(pieces[0], pieces[1]); } - KafkaProducer producer = new KafkaProducer(props); + KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ byte[] payload = new byte[recordSize]; Arrays.fill(payload, (byte) 1); - ProducerRecord record = new ProducerRecord(topicName, payload); + ProducerRecord record = new ProducerRecord(topicName, payload); long sleepTime = NS_PER_SEC / throughput; long sleepDeficitNs = 0; Stats stats = new Stats(numRecords, 5000); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java new file mode 100644 index 00000000000..a5433398fb9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * Any exception during deserialization in the consumer + */ +public class DeserializationException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public DeserializationException(String message, Throwable cause) { + super(message, cause); + } + + public DeserializationException(String message) { + super(message); + } + + public DeserializationException(Throwable cause) { + super(cause); + } + + public DeserializationException() { + super(); + } + + /* avoid the expensive and useless stack trace for deserialization exceptions */ + @Override + public Throwable fillInStackTrace() { + return this; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java new file mode 100644 index 00000000000..00388d12794 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * Any exception during serialization in the producer + */ +public class SerializationException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + + public SerializationException(String message) { + super(message); + } + + public SerializationException(Throwable cause) { + super(cause); + } + + public SerializationException() { + super(); + } + + /* avoid the expensive and useless stack trace for serialization exceptions */ + @Override + public Throwable fillInStackTrace() { + return this; + } + +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 9a9411fc900..1e2ca03fafa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -37,7 +37,7 @@ public class MockProducerTest { @Test public void testAutoCompleteMock() throws Exception { MockProducer producer = new MockProducer(true); - ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes()); + ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes()); Future metadata = producer.send(record); assertTrue("Send should be immediately complete", metadata.isDone()); assertFalse("Send should be successful", isError(metadata)); @@ -51,8 +51,8 @@ public class MockProducerTest { @Test public void testManualCompletion() throws Exception { MockProducer producer = new MockProducer(false); - ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes()); - ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes()); + ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes()); + ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes()); Future md1 = producer.send(record1); assertFalse("Send shouldn't have completed", md1.isDone()); Future md2 = producer.send(record2); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index f06e28ce21e..1d077fd4c56 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -50,22 +50,22 @@ public class PartitionerTest { public void testUserSuppliedPartitioning() { assertEquals("If the user supplies a partition we should use it.", 0, - partitioner.partition(new ProducerRecord("test", 0, key, value), cluster)); + partitioner.partition(new ProducerRecord("test", 0, key, value), cluster)); } @Test public void testKeyPartitionIsStable() { - int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster); + int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster); assertEquals("Same key should yield same partition", partition, - partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster)); + partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster)); } @Test public void testRoundRobinIsStable() { - int startPart = partitioner.partition(new ProducerRecord("test", value), cluster); + int startPart = partitioner.partition(new ProducerRecord("test", value), cluster); for (int i = 1; i <= 100; i++) { - int partition = partitioner.partition(new ProducerRecord("test", value), cluster); + int partition = partitioner.partition(new ProducerRecord("test", value), cluster); assertEquals("Should yield a different partition each call with round-robin partitioner", partition, (startPart + i) % 2); } @@ -74,7 +74,7 @@ public class PartitionerTest { @Test public void testRoundRobinWithDownNode() { for (int i = 0; i < partitions.size(); i++) { - int part = partitioner.partition(new ProducerRecord("test", value), cluster); + int part = partitioner.partition(new ProducerRecord("test", value), cluster); assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2); } } diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index b0207930dd0..8e007130b28 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -33,10 +33,10 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { // decide whether to send synchronously based on producer properties val sync = producerProps.getProperty("producer.type", "async").equals("sync") - val producer = new KafkaProducer(producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) override def send(topic: String, key: Array[Byte], value: Array[Byte]) { - val record = new ProducerRecord(topic, key, value) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value) if(sync) { this.producer.send(record).get() } else { diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 4b5b823b854..e1949424923 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -32,7 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var requiredNumAcks: Int = Int.MaxValue var syncSend: Boolean = false - private var producer: KafkaProducer = null + private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null def getTopic: String = topic def setTopic(topic: String) { this.topic = topic } @@ -60,7 +60,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) - producer = new KafkaProducer(props) + producer = new KafkaProducer[Array[Byte],Array[Byte]](props) LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) } @@ -68,7 +68,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def append(event: LoggingEvent) { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) - val response = producer.send(new ProducerRecord(topic, message.getBytes())) + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes())) if (syncSend) response.get } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 77d951d13b8..53cb16c2949 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -27,7 +27,6 @@ import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} -import org.apache.kafka.common.KafkaException import scala.collection.JavaConversions._ @@ -547,7 +546,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer { override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { - val record = new ProducerRecord(topicPartition.topic, key, value) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topicPartition.topic, key, value) if(sync) { topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset) } else { diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 3393a3dd574..f541987b287 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -124,7 +124,7 @@ object ReplayLogProducer extends Logging { class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) - val producer = new KafkaProducer(config.producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps) override def run() { info("Starting consumer thread..") @@ -137,7 +137,7 @@ object ReplayLogProducer extends Logging { stream for (messageAndMetadata <- iter) { try { - val response = producer.send(new ProducerRecord(config.outputTopic, + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, messageAndMetadata.key(), messageAndMetadata.message())) if(config.isSync) { response.get() diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 67196f30af1..2ebc7bf643e 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -56,7 +56,7 @@ object TestEndToEndLatency { producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) - val producer = new KafkaProducer(producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) // make sure the consumer fetcher has started before sending data since otherwise // the consumption from the tail will skip the first message and hence be blocked @@ -67,7 +67,7 @@ object TestEndToEndLatency { val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { val begin = System.nanoTime - producer.send(new ProducerRecord(topic, message)) + producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) val received = iter.next val elapsed = System.nanoTime - begin // poor man's progress bar diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index 1d4ea93f2ba..b81010ec0fa 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -242,7 +242,7 @@ object TestLogCleaning { val producerProps = new Properties producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) - val producer = new KafkaProducer(producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val rand = new Random(1) val keyCount = (messages / dups).toInt val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt") @@ -254,9 +254,9 @@ object TestLogCleaning { val delete = i % 100 < percentDeletes val msg = if(delete) - new ProducerRecord(topic, key.toString.getBytes(), null) + new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null) else - new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes()) + new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes()) producer.send(msg) producedWriter.write(TestRecord(topic, key, i, delete).toString) producedWriter.newLine() diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 6379f2b60af..1505fd4464d 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") props.put(ProducerConfig.LINGER_MS_CONFIG, "200") - var producer = new KafkaProducer(props) + var producer = new KafkaProducer[Array[Byte],Array[Byte]](props) val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") try { @@ -89,7 +89,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK // make sure the returned messages are correct val responses = for (message <- messages) - yield producer.send(new ProducerRecord(topic, null, null, message)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message)) val futures = responses.toList for ((future, offset) <- futures zip (0 until numRecords)) { assertEquals(offset.toLong, future.get.offset) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index a913fe59ba6..5ec613cdb50 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -52,10 +52,10 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null - private var producer1: KafkaProducer = null - private var producer2: KafkaProducer = null - private var producer3: KafkaProducer = null - private var producer4: KafkaProducer = null + private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null private val topic1 = "topic-1" private val topic2 = "topic-2" @@ -93,7 +93,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) } @@ -106,7 +106,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) intercept[ExecutionException] { producer2.send(record).get } @@ -118,7 +118,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testNonExistentTopic() { // send a record with non-exist topic - val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic2, null, "key".getBytes, "value".getBytes) intercept[ExecutionException] { producer1.send(record).get } @@ -143,7 +143,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) // send a record with incorrect broker list - val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) intercept[ExecutionException] { producer4.send(record).get } @@ -160,7 +160,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // first send a message to make sure the metadata is refreshed - val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) producer1.send(record1).get producer2.send(record1).get @@ -180,7 +180,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { val msgSize = producerBufferSize / tooManyRecords val value = new Array[Byte](msgSize) new Random().nextBytes(value) - val record2 = new ProducerRecord(topic1, null, "key".getBytes, value) + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, value) intercept[KafkaException] { for (i <- 1 to tooManyRecords) @@ -201,7 +201,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // create a record with incorrect partition id, send should fail - val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes) intercept[IllegalArgumentException] { producer1.send(record) } @@ -221,7 +221,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { // create topic TestUtils.createTopic(zkClient, topic1, 1, 2, servers) - val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) // first send a message to make sure the metadata is refreshed producer1.send(record).get @@ -300,7 +300,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testCannotSendToInternalTopic() { val thrown = intercept[ExecutionException] { - producer2.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get + producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get } assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException]) } @@ -313,7 +313,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes) try { producer3.send(record).get fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") @@ -333,7 +333,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes) // this should work with all brokers up and running producer3.send(record).get @@ -365,7 +365,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { override def doWork(): Unit = { val responses = for (i <- sent+1 to sent+numRecords) - yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes)) val futures = responses.toList try { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d407af9144e..6196060edf9 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -86,24 +86,24 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic, 1, 2, servers) // send a normal record - val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes) + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok - val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null) + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, null) assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok - val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes) + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null, "value".getBytes) assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok - val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) // send a record with null topic should fail try { - val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) + val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes, "value".getBytes) producer.send(record4, callback) fail("Should not allow sending a record without topic") } catch { @@ -140,7 +140,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic, 1, 2, servers) // non-blocking send a list of records - val record0 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) for (i <- 1 to numRecords) producer.send(record0) val response0 = producer.send(record0) @@ -182,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val responses = for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList futures.map(_.get) for (future <- futures) @@ -228,7 +228,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { try { // Send a message to auto-create the topic - val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0da774d0ed0..94d0028d8c4 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -383,7 +383,7 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer = { + retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -395,7 +395,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") - return new KafkaProducer(producerProps) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } /**