Browse Source

kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede

pull/38/head
Jun Rao 10 years ago
parent
commit
92d1d4cd31
  1. 34
      clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
  3. 14
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  4. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  5. 16
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
  6. 14
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
  7. 38
      clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
  8. 124
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  9. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
  10. 34
      clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
  11. 76
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  12. 14
      clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  13. 6
      clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  14. 15
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  15. 20
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
  16. 38
      clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
  17. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
  18. 4
      clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
  19. 47
      clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
  20. 46
      clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
  21. 6
      clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
  22. 12
      clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
  23. 4
      core/src/main/scala/kafka/producer/BaseProducer.scala
  24. 6
      core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
  25. 3
      core/src/main/scala/kafka/tools/MirrorMaker.scala
  26. 4
      core/src/main/scala/kafka/tools/ReplayLogProducer.scala
  27. 4
      core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
  28. 6
      core/src/main/scala/kafka/tools/TestLogCleaning.scala
  29. 4
      core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
  30. 32
      core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  31. 16
      core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
  32. 4
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

34
clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.util.Map;
public class ByteArrayDeserializer implements Deserializer<byte[]> {
@Override
public void configure(Map<String, ?> configs) {
// nothing to do
}
@Override
public byte[] deserialize(String topic, byte[] data, boolean isKey) {
return data;
}
@Override
public void close() {
// nothing to do
}
}

4
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition; @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
* @see KafkaConsumer
* @see MockConsumer
*/
public interface Consumer extends Closeable {
public interface Consumer<K,V> extends Closeable {
/**
* Incrementally subscribe to the given list of topics. This API is mutually exclusive to
@ -63,7 +63,7 @@ public interface Consumer extends Closeable { @@ -63,7 +63,7 @@ public interface Consumer extends Closeable {
* of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}.
* If no data is available for timeout ms, returns an empty list
*/
public Map<String, ConsumerRecords> poll(long timeout);
public Map<String, ConsumerRecords<K,V>> poll(long timeout);
/**
* Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.

14
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -151,6 +151,14 @@ public class ConsumerConfig extends AbstractConfig { @@ -151,6 +151,14 @@ public class ConsumerConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
/** <code>key.deserializer</code> */
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
/** <code>value.deserializer</code> */
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
static {
/* TODO: add config docs */
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
@ -176,8 +184,10 @@ public class ConsumerConfig extends AbstractConfig { @@ -176,8 +184,10 @@ public class ConsumerConfig extends AbstractConfig {
Importance.LOW,
METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
}
ConsumerConfig(Map<? extends Object, ? extends Object> props) {

4
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java

@ -35,7 +35,7 @@ public interface ConsumerRebalanceCallback { @@ -35,7 +35,7 @@ public interface ConsumerRebalanceCallback {
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
* @param partitions The list of partitions that are assigned to the consumer after rebalance
*/
public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions);
public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions);
/**
* A callback method the user can implement to provide handling of offset commits to a customized store on the
@ -46,5 +46,5 @@ public interface ConsumerRebalanceCallback { @@ -46,5 +46,5 @@ public interface ConsumerRebalanceCallback {
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
*/
public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions);
public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions);
}

16
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java

@ -18,10 +18,10 @@ import org.apache.kafka.common.TopicPartition; @@ -18,10 +18,10 @@ import org.apache.kafka.common.TopicPartition;
* A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
* record is being received and an offset that points to the record in a Kafka partition.
*/
public final class ConsumerRecord {
public final class ConsumerRecord<K,V> {
private final TopicPartition partition;
private final byte[] key;
private final byte[] value;
private final K key;
private final V value;
private final long offset;
private volatile Exception error;
@ -34,7 +34,7 @@ public final class ConsumerRecord { @@ -34,7 +34,7 @@ public final class ConsumerRecord {
* @param value The record contents
* @param offset The offset of this record in the corresponding Kafka partition
*/
public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) {
public ConsumerRecord(String topic, int partitionId, K key, V value, long offset) {
this(topic, partitionId, key, value, offset, null);
}
@ -46,7 +46,7 @@ public final class ConsumerRecord { @@ -46,7 +46,7 @@ public final class ConsumerRecord {
* @param value The record contents
* @param offset The offset of this record in the corresponding Kafka partition
*/
public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) {
public ConsumerRecord(String topic, int partitionId, V value, long offset) {
this(topic, partitionId, null, value, offset);
}
@ -60,7 +60,7 @@ public final class ConsumerRecord { @@ -60,7 +60,7 @@ public final class ConsumerRecord {
this(topic, partitionId, null, null, -1L, error);
}
private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) {
private ConsumerRecord(String topic, int partitionId, K key, V value, long offset, Exception error) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.partition = new TopicPartition(topic, partitionId);
@ -95,7 +95,7 @@ public final class ConsumerRecord { @@ -95,7 +95,7 @@ public final class ConsumerRecord {
* The key (or null if no key is specified)
* @throws Exception The exception thrown while fetching this record.
*/
public byte[] key() throws Exception {
public K key() throws Exception {
if (this.error != null)
throw this.error;
return key;
@ -105,7 +105,7 @@ public final class ConsumerRecord { @@ -105,7 +105,7 @@ public final class ConsumerRecord {
* The value
* @throws Exception The exception thrown while fetching this record.
*/
public byte[] value() throws Exception {
public V value() throws Exception {
if (this.error != null)
throw this.error;
return value;

14
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java

@ -21,12 +21,12 @@ import java.util.Map.Entry; @@ -21,12 +21,12 @@ import java.util.Map.Entry;
* A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a
* {@link Consumer#poll(long)} operation.
*/
public class ConsumerRecords {
public class ConsumerRecords<K,V> {
private final String topic;
private final Map<Integer, List<ConsumerRecord>> recordsPerPartition;
private final Map<Integer, List<ConsumerRecord<K,V>>> recordsPerPartition;
public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord>> records) {
public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord<K,V>>> records) {
this.topic = topic;
this.recordsPerPartition = records;
}
@ -36,16 +36,16 @@ public class ConsumerRecords { @@ -36,16 +36,16 @@ public class ConsumerRecords {
* specified, returns records for all partitions
* @return The list of {@link ConsumerRecord}s associated with the given partitions.
*/
public List<ConsumerRecord> records(int... partitions) {
List<ConsumerRecord> recordsToReturn = new ArrayList<ConsumerRecord>();
public List<ConsumerRecord<K,V>> records(int... partitions) {
List<ConsumerRecord<K,V>> recordsToReturn = new ArrayList<ConsumerRecord<K,V>>();
if(partitions.length == 0) {
// return records for all partitions
for(Entry<Integer, List<ConsumerRecord>> record : recordsPerPartition.entrySet()) {
for(Entry<Integer, List<ConsumerRecord<K,V>>> record : recordsPerPartition.entrySet()) {
recordsToReturn.addAll(record.getValue());
}
} else {
for(int partition : partitions) {
List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(partition);
List<ConsumerRecord<K,V>> recordsForThisPartition = recordsPerPartition.get(partition);
recordsToReturn.addAll(recordsForThisPartition);
}
}

38
clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Configurable;
/**
*
* @param <T> Type to be deserialized into.
*
* A class that implements this interface is expected to have a constructor with no parameter.
*/
public interface Deserializer<T> extends Configurable {
/**
*
* @param topic Topic associated with the data
* @param data Serialized bytes
* @param isKey Is data for key or value
* @return deserialized typed data
*/
public T deserialize(String topic, byte[] data, boolean isKey);
/**
* Close this deserializer
*/
public void close();
}

124
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -12,19 +12,6 @@ @@ -12,19 +12,6 @@
*/
package org.apache.kafka.clients.consumer;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
@ -36,6 +23,9 @@ import org.apache.kafka.common.utils.SystemTime; @@ -36,6 +23,9 @@ import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
/**
* A Kafka client that consumes records from a Kafka cluster.
* <P>
@ -50,12 +40,12 @@ import org.slf4j.LoggerFactory; @@ -50,12 +40,12 @@ import org.slf4j.LoggerFactory;
* a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
* <pre>
* {@code
* private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
* private Map<TopicPartition, Long> process(Map<String, ConsumerRecord<byte[], byte[]> records) {
* Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
* for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
* List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
* for(Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
* List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
* for(int i = 0;i < recordsPerTopic.size();i++) {
* ConsumerRecord record = recordsPerTopic.get(i);
* ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
* // process record
* try {
* processedOffsets.put(record.topicAndpartition(), record.offset());
@ -80,11 +70,11 @@ import org.slf4j.LoggerFactory; @@ -80,11 +70,11 @@ import org.slf4j.LoggerFactory;
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "true");
* props.put("auto.commit.interval.ms", "10000");
* KafkaConsumer consumer = new KafkaConsumer(props);
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* consumer.subscribe("foo", "bar");
* boolean isRunning = true;
* while(isRunning) {
* Map<String, ConsumerRecords> records = consumer.poll(100);
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* process(records);
* }
* consumer.close();
@ -102,14 +92,14 @@ import org.slf4j.LoggerFactory; @@ -102,14 +92,14 @@ import org.slf4j.LoggerFactory;
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "false");
* KafkaConsumer consumer = new KafkaConsumer(props);
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* consumer.subscribe("foo", "bar");
* int commitInterval = 100;
* int numRecords = 0;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords> records = consumer.poll(100);
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* try {
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
@ -156,16 +146,17 @@ import org.slf4j.LoggerFactory; @@ -156,16 +146,17 @@ import org.slf4j.LoggerFactory;
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "false");
* KafkaConsumer consumer = new KafkaConsumer(props,
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
* props,
* new ConsumerRebalanceCallback() {
* boolean rewindOffsets = true; // should be retrieved from external application config
* public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
* public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
* Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
* if(rewindOffsets)
* Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
* consumer.seek(newOffsets);
* }
* public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
* public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
* consumer.commit(true);
* }
* // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
@ -183,7 +174,7 @@ import org.slf4j.LoggerFactory; @@ -183,7 +174,7 @@ import org.slf4j.LoggerFactory;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords> records = consumer.poll(100);
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* numRecords += records.size();
@ -211,13 +202,14 @@ import org.slf4j.LoggerFactory; @@ -211,13 +202,14 @@ import org.slf4j.LoggerFactory;
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
* KafkaConsumer consumer = new KafkaConsumer(props,
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
* props,
* new ConsumerRebalanceCallback() {
* public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
* public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
* Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
* consumer.seek(lastCommittedOffsets);
* }
* public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
* public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
* Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
* commitOffsetsToCustomStore(offsets);
* }
@ -234,7 +226,7 @@ import org.slf4j.LoggerFactory; @@ -234,7 +226,7 @@ import org.slf4j.LoggerFactory;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords> records = consumer.poll(100);
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* numRecords += records.size();
@ -259,7 +251,7 @@ import org.slf4j.LoggerFactory; @@ -259,7 +251,7 @@ import org.slf4j.LoggerFactory;
* props.put("group.id", "test");
* props.put("enable.auto.commit", "true");
* props.put("auto.commit.interval.ms", "10000");
* KafkaConsumer consumer = new KafkaConsumer(props);
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* // subscribe to some partitions of topic foo
* TopicPartition partition0 = new TopicPartition("foo", 0);
* TopicPartition partition1 = new TopicPartition("foo", 1);
@ -276,7 +268,7 @@ import org.slf4j.LoggerFactory; @@ -276,7 +268,7 @@ import org.slf4j.LoggerFactory;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords> records = consumer.poll(100);
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* for(TopicPartition partition : partitions) {
@ -298,7 +290,7 @@ import org.slf4j.LoggerFactory; @@ -298,7 +290,7 @@ import org.slf4j.LoggerFactory;
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* KafkaConsumer consumer = new KafkaConsumer(props);
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* // subscribe to some partitions of topic foo
* TopicPartition partition0 = new TopicPartition("foo", 0);
* TopicPartition partition1 = new TopicPartition("foo", 1);
@ -314,7 +306,7 @@ import org.slf4j.LoggerFactory; @@ -314,7 +306,7 @@ import org.slf4j.LoggerFactory;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords> records = consumer.poll(100);
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* // commit offsets for partitions 0,1 for topic foo to custom store
@ -331,7 +323,7 @@ import org.slf4j.LoggerFactory; @@ -331,7 +323,7 @@ import org.slf4j.LoggerFactory;
* }
* </pre>
*/
public class KafkaConsumer implements Consumer {
public class KafkaConsumer<K,V> implements Consumer<K,V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@ -340,7 +332,9 @@ public class KafkaConsumer implements Consumer { @@ -340,7 +332,9 @@ public class KafkaConsumer implements Consumer {
private final Metrics metrics;
private final Set<String> subscribedTopics;
private final Set<TopicPartition> subscribedPartitions;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
@ -351,7 +345,7 @@ public class KafkaConsumer implements Consumer { @@ -351,7 +345,7 @@ public class KafkaConsumer implements Consumer {
* @param configs The consumer configs
*/
public KafkaConsumer(Map<String, Object> configs) {
this(new ConsumerConfig(configs), null);
this(new ConsumerConfig(configs), null, null, null);
}
/**
@ -364,7 +358,24 @@ public class KafkaConsumer implements Consumer { @@ -364,7 +358,24 @@ public class KafkaConsumer implements Consumer {
* every rebalance operation.
*/
public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
this(new ConsumerConfig(configs), callback);
this(new ConsumerConfig(configs), callback, null, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
* implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param configs The consumer configs
* @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
* every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
* be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer);
}
/**
@ -372,12 +383,12 @@ public class KafkaConsumer implements Consumer { @@ -372,12 +383,12 @@ public class KafkaConsumer implements Consumer {
* Valid configuration strings are documented at {@link ConsumerConfig}
*/
public KafkaConsumer(Properties properties) {
this(new ConsumerConfig(properties), null);
this(new ConsumerConfig(properties), null, null, null);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
* {@link ConsumerRebalanceCallback} implementation.
* {@link ConsumerRebalanceCallback} implementation.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param properties The consumer configuration properties
@ -385,14 +396,27 @@ public class KafkaConsumer implements Consumer { @@ -385,14 +396,27 @@ public class KafkaConsumer implements Consumer {
* every rebalance operation.
*/
public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
this(new ConsumerConfig(properties), callback);
this(new ConsumerConfig(properties), callback, null, null);
}
private KafkaConsumer(ConsumerConfig config) {
this(config, null);
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
* {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param properties The consumer configuration properties
* @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
* every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
* be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer);
}
private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) {
private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
log.trace("Starting the Kafka consumer");
subscribedTopics = new HashSet<String>();
subscribedPartitions = new HashSet<TopicPartition>();
@ -402,6 +426,18 @@ public class KafkaConsumer implements Consumer { @@ -402,6 +426,18 @@ public class KafkaConsumer implements Consumer {
this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (keyDeserializer == null)
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
else
this.keyDeserializer = keyDeserializer;
if (valueDeserializer == null)
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
else
this.valueDeserializer = valueDeserializer;
config.logUnused();
log.debug("Kafka consumer started");
}
@ -488,7 +524,7 @@ public class KafkaConsumer implements Consumer { @@ -488,7 +524,7 @@ public class KafkaConsumer implements Consumer {
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*/
@Override
public Map<String, ConsumerRecords> poll(long timeout) {
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}

6
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition; @@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition;
* The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
* needs to communicate with. Failure to close the consumer after use will leak these resources.
*/
public class MockConsumer implements Consumer {
public class MockConsumer implements Consumer<byte[], byte[]> {
private final Set<TopicPartition> subscribedPartitions;
private final Set<String> subscribedTopics;
@ -90,10 +90,10 @@ public class MockConsumer implements Consumer { @@ -90,10 +90,10 @@ public class MockConsumer implements Consumer {
}
@Override
public Map<String, ConsumerRecords> poll(long timeout) {
public Map<String, ConsumerRecords<byte[], byte[]>> poll(long timeout) {
// hand out one dummy record, 1 per topic
Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>();
Map<String, ConsumerRecords> recordMetadata = new HashMap<String, ConsumerRecords>();
Map<String, ConsumerRecords<byte[], byte[]>> recordMetadata = new HashMap<String, ConsumerRecords<byte[], byte[]>>();
for(TopicPartition partition : subscribedPartitions) {
// get the last consumed offset
long messageSequence = consumedOffsets.get(partition);

34
clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
import java.util.Map;
public class ByteArraySerializer implements Serializer<byte[]> {
@Override
public void configure(Map<String, ?> configs) {
// nothing to do
}
@Override
public byte[] serialize(String topic, byte[] data, boolean isKey) {
return data;
}
@Override
public void close() {
// nothing to do
}
}

76
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
* needs to communicate with. Failure to close the producer after use will leak these resources.
*/
public class KafkaProducer implements Producer {
public class KafkaProducer<K,V> implements Producer<K,V> {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@ -75,26 +75,59 @@ public class KafkaProducer implements Producer { @@ -75,26 +75,59 @@ public class KafkaProducer implements Producer {
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
* either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* @param configs The producer configs
*
*/
public KafkaProducer(Map<String, Object> configs) {
this(new ProducerConfig(configs));
this(new ProducerConfig(configs), null, null);
}
/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
* either the string "42" or the integer 42).
* @param configs The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(configs), keySerializer, valueSerializer);
}
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* @param properties The producer configs
*/
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties));
this(new ProducerConfig(properties), null, null);
}
private KafkaProducer(ProducerConfig config) {
/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* @param properties The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(properties), keySerializer, valueSerializer);
}
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
log.trace("Starting the Kafka producer");
this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
@ -145,6 +178,17 @@ public class KafkaProducer implements Producer { @@ -145,6 +178,17 @@ public class KafkaProducer implements Producer {
this.errors = this.metrics.sensor("errors");
if (keySerializer == null)
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
else
this.keySerializer = keySerializer;
if (valueSerializer == null)
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
else
this.valueSerializer = valueSerializer;
config.logUnused();
log.debug("Kafka producer started");
}
@ -159,9 +203,10 @@ public class KafkaProducer implements Producer { @@ -159,9 +203,10 @@ public class KafkaProducer implements Producer {
/**
* Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
* @param record The record to be sent
*/
@Override
public Future<RecordMetadata> send(ProducerRecord record) {
public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
return send(record, null);
}
@ -183,14 +228,14 @@ public class KafkaProducer implements Producer { @@ -183,14 +228,14 @@ public class KafkaProducer implements Producer {
* If you want to simulate a simple blocking call you can do the following:
*
* <pre>
* producer.send(new ProducerRecord("the-topic", "key, "value")).get();
* producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
* </pre>
* <p>
* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
*
* <pre>
* ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
* producer.send(myRecord,
* new Callback() {
* public void onCompletion(RecordMetadata metadata, Exception e) {
@ -205,8 +250,8 @@ public class KafkaProducer implements Producer { @@ -205,8 +250,8 @@ public class KafkaProducer implements Producer {
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
*
* <pre>
* producer.send(new ProducerRecord(topic, partition, key, value), callback1);
* producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key, value), callback1);
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
* </pre>
* <p>
* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
@ -226,16 +271,19 @@ public class KafkaProducer implements Producer { @@ -226,16 +271,19 @@ public class KafkaProducer implements Producer {
* indicates no callback)
*/
@Override
public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
try {
// first make sure the metadata for the topic is available
waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
int partition = partitioner.partition(record, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true);
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false);
ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
int partition = partitioner.partition(serializedRecord, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
@ -324,6 +372,8 @@ public class KafkaProducer implements Producer { @@ -324,6 +372,8 @@ public class KafkaProducer implements Producer {
throw new KafkaException(e);
}
this.metrics.close();
this.keySerializer.close();
this.valueSerializer.close();
log.debug("The Kafka producer has closed.");
}

14
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

@ -40,11 +40,11 @@ import org.apache.kafka.common.TopicPartition; @@ -40,11 +40,11 @@ import org.apache.kafka.common.TopicPartition;
* By default this mock will synchronously complete each send call successfully. However it can be configured to allow
* the user to control the completion of the call and supply an optional error for the producer to throw.
*/
public class MockProducer implements Producer {
public class MockProducer implements Producer<byte[], byte[]> {
private final Cluster cluster;
private final Partitioner partitioner = new Partitioner();
private final List<ProducerRecord> sent;
private final List<ProducerRecord<byte[], byte[]>> sent;
private final Deque<Completion> completions;
private boolean autoComplete;
private Map<TopicPartition, Long> offsets;
@ -62,7 +62,7 @@ public class MockProducer implements Producer { @@ -62,7 +62,7 @@ public class MockProducer implements Producer {
this.cluster = cluster;
this.autoComplete = autoComplete;
this.offsets = new HashMap<TopicPartition, Long>();
this.sent = new ArrayList<ProducerRecord>();
this.sent = new ArrayList<ProducerRecord<byte[], byte[]>>();
this.completions = new ArrayDeque<Completion>();
}
@ -90,7 +90,7 @@ public class MockProducer implements Producer { @@ -90,7 +90,7 @@ public class MockProducer implements Producer {
* @see #history()
*/
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord record) {
public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record) {
return send(record, null);
}
@ -100,7 +100,7 @@ public class MockProducer implements Producer { @@ -100,7 +100,7 @@ public class MockProducer implements Producer {
* @see #history()
*/
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
int partition = 0;
if (this.cluster.partitionsForTopic(record.topic()) != null)
partition = partitioner.partition(record, this.cluster);
@ -147,8 +147,8 @@ public class MockProducer implements Producer { @@ -147,8 +147,8 @@ public class MockProducer implements Producer {
/**
* Get the list of sent records since the last call to {@link #clear()}
*/
public synchronized List<ProducerRecord> history() {
return new ArrayList<ProducerRecord>(this.sent);
public synchronized List<ProducerRecord<byte[], byte[]>> history() {
return new ArrayList<ProducerRecord<byte[], byte[]>>(this.sent);
}
/**

6
clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

@ -31,7 +31,7 @@ import org.apache.kafka.common.PartitionInfo; @@ -31,7 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
* @see KafkaProducer
* @see MockProducer
*/
public interface Producer extends Closeable {
public interface Producer<K,V> extends Closeable {
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
@ -39,12 +39,12 @@ public interface Producer extends Closeable { @@ -39,12 +39,12 @@ public interface Producer extends Closeable {
* @param record The record to send
* @return A future which will eventually contain the response information
*/
public Future<RecordMetadata> send(ProducerRecord record);
public Future<RecordMetadata> send(ProducerRecord<K,V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
/**
* Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change

15
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -17,7 +17,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between; @@ -17,7 +17,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
@ -175,6 +174,14 @@ public class ProducerConfig extends AbstractConfig { @@ -175,6 +174,14 @@ public class ProducerConfig extends AbstractConfig {
+ " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
+ " message re-ordering due to retries (i.e., if retries are enabled).";
/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
/** <code>value.serializer</code> */
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
static {
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@ -182,7 +189,7 @@ public class ProducerConfig extends AbstractConfig { @@ -182,7 +189,7 @@ public class ProducerConfig extends AbstractConfig {
.define(ACKS_CONFIG,
Type.STRING,
"1",
in(Arrays.asList("all","-1", "0", "1")),
in(Arrays.asList("all", "-1", "0", "1")),
Importance.HIGH,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
@ -221,7 +228,9 @@ public class ProducerConfig extends AbstractConfig { @@ -221,7 +228,9 @@ public class ProducerConfig extends AbstractConfig {
5,
atLeast(1),
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {

20
clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java

@ -20,12 +20,12 @@ package org.apache.kafka.clients.producer; @@ -20,12 +20,12 @@ package org.apache.kafka.clients.producer;
* specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
* present a partition will be assigned in a round-robin fashion.
*/
public final class ProducerRecord {
public final class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final byte[] key;
private final byte[] value;
private final K key;
private final V value;
/**
* Creates a record to be sent to a specified topic and partition
@ -35,7 +35,7 @@ public final class ProducerRecord { @@ -35,7 +35,7 @@ public final class ProducerRecord {
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
public ProducerRecord(String topic, Integer partition, K key, V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
@ -51,7 +51,7 @@ public final class ProducerRecord { @@ -51,7 +51,7 @@ public final class ProducerRecord {
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, byte[] key, byte[] value) {
public ProducerRecord(String topic, K key, V value) {
this(topic, null, key, value);
}
@ -61,7 +61,7 @@ public final class ProducerRecord { @@ -61,7 +61,7 @@ public final class ProducerRecord {
* @param topic The topic this record should be sent to
* @param value The record contents
*/
public ProducerRecord(String topic, byte[] value) {
public ProducerRecord(String topic, V value) {
this(topic, null, value);
}
@ -75,14 +75,14 @@ public final class ProducerRecord { @@ -75,14 +75,14 @@ public final class ProducerRecord {
/**
* The key (or null if no key is specified)
*/
public byte[] key() {
public K key() {
return key;
}
/**
* @return The value
*/
public byte[] value() {
public V value() {
return value;
}
@ -95,8 +95,8 @@ public final class ProducerRecord { @@ -95,8 +95,8 @@ public final class ProducerRecord {
@Override
public String toString() {
String key = this.key == null ? "null" : ("byte[" + this.key.length + "]");
String value = this.value == null ? "null" : ("byte[" + this.value.length + "]");
String key = this.key == null ? "null" : this.key.toString();
String value = this.value == null ? "null" : this.value.toString();
return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
}
}

38
clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
/**
*
* @param <T> Type to be serialized from.
*
* A class that implements this interface is expected to have a constructor with no parameter.
*/
public interface Serializer<T> extends Configurable {
/**
*
* @param topic Topic associated with data
* @param data Typed data
* @param isKey Is data for key or value
* @return bytes of the serialized data
*/
public byte[] serialize(String topic, T data, boolean isKey);
/**
* Close this serializer
*/
public void close();
}

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java

@ -43,7 +43,7 @@ public class Partitioner { @@ -43,7 +43,7 @@ public class Partitioner {
* @param record The record being sent
* @param cluster The current cluster metadata
*/
public int partition(ProducerRecord record, Cluster cluster) {
public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
if (record.partition() != null) {

4
clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java

@ -46,12 +46,12 @@ public class ProducerPerformance { @@ -46,12 +46,12 @@ public class ProducerPerformance {
throw new IllegalArgumentException("Invalid property: " + args[i]);
props.put(pieces[0], pieces[1]);
}
KafkaProducer producer = new KafkaProducer(props);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[],byte[]>(props);
/* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord(topicName, payload);
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>(topicName, payload);
long sleepTime = NS_PER_SEC / throughput;
long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000);

47
clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java

@ -0,0 +1,47 @@ @@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.KafkaException;
/**
* Any exception during deserialization in the consumer
*/
public class DeserializationException extends KafkaException {
private static final long serialVersionUID = 1L;
public DeserializationException(String message, Throwable cause) {
super(message, cause);
}
public DeserializationException(String message) {
super(message);
}
public DeserializationException(Throwable cause) {
super(cause);
}
public DeserializationException() {
super();
}
/* avoid the expensive and useless stack trace for deserialization exceptions */
@Override
public Throwable fillInStackTrace() {
return this;
}
}

46
clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java

@ -0,0 +1,46 @@ @@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.KafkaException;
/**
* Any exception during serialization in the producer
*/
public class SerializationException extends KafkaException {
private static final long serialVersionUID = 1L;
public SerializationException(String message, Throwable cause) {
super(message, cause);
}
public SerializationException(String message) {
super(message);
}
public SerializationException(Throwable cause) {
super(cause);
}
public SerializationException() {
super();
}
/* avoid the expensive and useless stack trace for serialization exceptions */
@Override
public Throwable fillInStackTrace() {
return this;
}
}

6
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java

@ -37,7 +37,7 @@ public class MockProducerTest { @@ -37,7 +37,7 @@ public class MockProducerTest {
@Test
public void testAutoCompleteMock() throws Exception {
MockProducer producer = new MockProducer(true);
ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
ProducerRecord record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
Future<RecordMetadata> metadata = producer.send(record);
assertTrue("Send should be immediately complete", metadata.isDone());
assertFalse("Send should be successful", isError(metadata));
@ -51,8 +51,8 @@ public class MockProducerTest { @@ -51,8 +51,8 @@ public class MockProducerTest {
@Test
public void testManualCompletion() throws Exception {
MockProducer producer = new MockProducer(false);
ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
ProducerRecord record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
ProducerRecord record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
Future<RecordMetadata> md1 = producer.send(record1);
assertFalse("Send shouldn't have completed", md1.isDone());
Future<RecordMetadata> md2 = producer.send(record2);

12
clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java

@ -50,22 +50,22 @@ public class PartitionerTest { @@ -50,22 +50,22 @@ public class PartitionerTest {
public void testUserSuppliedPartitioning() {
assertEquals("If the user supplies a partition we should use it.",
0,
partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
partitioner.partition(new ProducerRecord<byte[], byte[]>("test", 0, key, value), cluster));
}
@Test
public void testKeyPartitionIsStable() {
int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, value), cluster);
assertEquals("Same key should yield same partition",
partition,
partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster));
partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, "value2".getBytes()), cluster));
}
@Test
public void testRoundRobinIsStable() {
int startPart = partitioner.partition(new ProducerRecord("test", value), cluster);
int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
for (int i = 1; i <= 100; i++) {
int partition = partitioner.partition(new ProducerRecord("test", value), cluster);
int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
assertEquals("Should yield a different partition each call with round-robin partitioner",
partition, (startPart + i) % 2);
}
@ -74,7 +74,7 @@ public class PartitionerTest { @@ -74,7 +74,7 @@ public class PartitionerTest {
@Test
public void testRoundRobinWithDownNode() {
for (int i = 0; i < partitions.size(); i++) {
int part = partitioner.partition(new ProducerRecord("test", value), cluster);
int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
}
}

4
core/src/main/scala/kafka/producer/BaseProducer.scala

@ -33,10 +33,10 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { @@ -33,10 +33,10 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
// decide whether to send synchronously based on producer properties
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
val producer = new KafkaProducer(producerProps)
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
val record = new ProducerRecord(topic, key, value)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
if(sync) {
this.producer.send(record).get()
} else {

6
core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala

@ -32,7 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -32,7 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var requiredNumAcks: Int = Int.MaxValue
var syncSend: Boolean = false
private var producer: KafkaProducer = null
private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
def getTopic: String = topic
def setTopic(topic: String) { this.topic = topic }
@ -60,7 +60,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -60,7 +60,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
producer = new KafkaProducer(props)
producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
LogLog.debug("Kafka producer connected to " + brokerList)
LogLog.debug("Logging for topic: " + topic)
}
@ -68,7 +68,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -68,7 +68,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
override def append(event: LoggingEvent) {
val message = subAppend(event)
LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
val response = producer.send(new ProducerRecord(topic, message.getBytes()))
val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes()))
if (syncSend) response.get
}

3
core/src/main/scala/kafka/tools/MirrorMaker.scala

@ -27,7 +27,6 @@ import kafka.producer.{OldProducer, NewShinyProducer} @@ -27,7 +27,6 @@ import kafka.producer.{OldProducer, NewShinyProducer}
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord}
import org.apache.kafka.common.KafkaException
import scala.collection.JavaConversions._
@ -547,7 +546,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -547,7 +546,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer {
override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) {
val record = new ProducerRecord(topicPartition.topic, key, value)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topicPartition.topic, key, value)
if(sync) {
topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset)
} else {

4
core/src/main/scala/kafka/tools/ReplayLogProducer.scala

@ -124,7 +124,7 @@ object ReplayLogProducer extends Logging { @@ -124,7 +124,7 @@ object ReplayLogProducer extends Logging {
class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val producer = new KafkaProducer(config.producerProps)
val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps)
override def run() {
info("Starting consumer thread..")
@ -137,7 +137,7 @@ object ReplayLogProducer extends Logging { @@ -137,7 +137,7 @@ object ReplayLogProducer extends Logging {
stream
for (messageAndMetadata <- iter) {
try {
val response = producer.send(new ProducerRecord(config.outputTopic,
val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic,
messageAndMetadata.key(), messageAndMetadata.message()))
if(config.isSync) {
response.get()

4
core/src/main/scala/kafka/tools/TestEndToEndLatency.scala

@ -56,7 +56,7 @@ object TestEndToEndLatency { @@ -56,7 +56,7 @@ object TestEndToEndLatency {
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
val producer = new KafkaProducer(producerProps)
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
// make sure the consumer fetcher has started before sending data since otherwise
// the consumption from the tail will skip the first message and hence be blocked
@ -67,7 +67,7 @@ object TestEndToEndLatency { @@ -67,7 +67,7 @@ object TestEndToEndLatency {
val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) {
val begin = System.nanoTime
producer.send(new ProducerRecord(topic, message))
producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar

6
core/src/main/scala/kafka/tools/TestLogCleaning.scala

@ -242,7 +242,7 @@ object TestLogCleaning { @@ -242,7 +242,7 @@ object TestLogCleaning {
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
val producer = new KafkaProducer(producerProps)
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
val rand = new Random(1)
val keyCount = (messages / dups).toInt
val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
@ -254,9 +254,9 @@ object TestLogCleaning { @@ -254,9 +254,9 @@ object TestLogCleaning {
val delete = i % 100 < percentDeletes
val msg =
if(delete)
new ProducerRecord(topic, key.toString.getBytes(), null)
new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
else
new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes())
new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes())
producer.send(msg)
producedWriter.write(TestRecord(topic, key, i, delete).toString)
producedWriter.newLine()

4
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala

@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK @@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
var producer = new KafkaProducer(props)
var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
try {
@ -89,7 +89,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK @@ -89,7 +89,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
// make sure the returned messages are correct
val responses = for (message <- messages)
yield producer.send(new ProducerRecord(topic, null, null, message))
yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message))
val futures = responses.toList
for ((future, offset) <- futures zip (0 until numRecords)) {
assertEquals(offset.toLong, future.get.offset)

32
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala

@ -52,10 +52,10 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -52,10 +52,10 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var producer1: KafkaProducer = null
private var producer2: KafkaProducer = null
private var producer3: KafkaProducer = null
private var producer4: KafkaProducer = null
private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
private val topic1 = "topic-1"
private val topic2 = "topic-2"
@ -93,7 +93,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -93,7 +93,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
}
@ -106,7 +106,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -106,7 +106,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
intercept[ExecutionException] {
producer2.send(record).get
}
@ -118,7 +118,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -118,7 +118,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testNonExistentTopic() {
// send a record with non-exist topic
val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic2, null, "key".getBytes, "value".getBytes)
intercept[ExecutionException] {
producer1.send(record).get
}
@ -143,7 +143,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -143,7 +143,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
// send a record with incorrect broker list
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
intercept[ExecutionException] {
producer4.send(record).get
}
@ -160,7 +160,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -160,7 +160,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// first send a message to make sure the metadata is refreshed
val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
producer1.send(record1).get
producer2.send(record1).get
@ -180,7 +180,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -180,7 +180,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
val msgSize = producerBufferSize / tooManyRecords
val value = new Array[Byte](msgSize)
new Random().nextBytes(value)
val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, value)
intercept[KafkaException] {
for (i <- 1 to tooManyRecords)
@ -201,7 +201,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -201,7 +201,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// create a record with incorrect partition id, send should fail
val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes)
intercept[IllegalArgumentException] {
producer1.send(record)
}
@ -221,7 +221,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -221,7 +221,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
// first send a message to make sure the metadata is refreshed
producer1.send(record).get
@ -300,7 +300,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -300,7 +300,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testCannotSendToInternalTopic() {
val thrown = intercept[ExecutionException] {
producer2.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
}
assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException])
}
@ -313,7 +313,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -313,7 +313,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
try {
producer3.send(record).get
fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
@ -333,7 +333,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -333,7 +333,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
// this should work with all brokers up and running
producer3.send(record).get
@ -365,7 +365,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -365,7 +365,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override def doWork(): Unit = {
val responses =
for (i <- sent+1 to sent+numRecords)
yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes))
yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes))
val futures = responses.toList
try {

16
core/src/test/scala/integration/kafka/api/ProducerSendTest.scala

@ -86,24 +86,24 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -86,24 +86,24 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topic, 1, 2, servers)
// send a normal record
val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
// send a record with null value should be ok
val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, null)
assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
// send a record with null key should be ok
val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null, "value".getBytes)
assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
// send a record with null part id should be ok
val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
// send a record with null topic should fail
try {
val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes, "value".getBytes)
producer.send(record4, callback)
fail("Should not allow sending a record without topic")
} catch {
@ -140,7 +140,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -140,7 +140,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
TestUtils.createTopic(zkClient, topic, 1, 2, servers)
// non-blocking send a list of records
val record0 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
for (i <- 1 to numRecords)
producer.send(record0)
val response0 = producer.send(record0)
@ -182,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -182,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
val responses =
for (i <- 1 to numRecords)
yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes))
val futures = responses.toList
futures.map(_.get)
for (future <- futures)
@ -228,7 +228,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -228,7 +228,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
// double check that the topic is created with leader elected

4
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -383,7 +383,7 @@ object TestUtils extends Logging { @@ -383,7 +383,7 @@ object TestUtils extends Logging {
metadataFetchTimeout: Long = 3000L,
blockOnBufferFull: Boolean = true,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0) : KafkaProducer = {
retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = {
import org.apache.kafka.clients.producer.ProducerConfig
val producerProps = new Properties()
@ -395,7 +395,7 @@ object TestUtils extends Logging { @@ -395,7 +395,7 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
return new KafkaProducer(producerProps)
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
}
/**

Loading…
Cancel
Save