|
|
@ -76,6 +76,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> { |
|
|
|
private final Time time; |
|
|
|
private final Time time; |
|
|
|
private final Serializer<K> keySerializer; |
|
|
|
private final Serializer<K> keySerializer; |
|
|
|
private final Serializer<V> valueSerializer; |
|
|
|
private final Serializer<V> valueSerializer; |
|
|
|
|
|
|
|
private final ProducerConfig producerConfig; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings |
|
|
|
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings |
|
|
@ -152,6 +153,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> { |
|
|
|
|
|
|
|
|
|
|
|
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
|
|
|
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
|
|
|
log.trace("Starting the Kafka producer"); |
|
|
|
log.trace("Starting the Kafka producer"); |
|
|
|
|
|
|
|
this.producerConfig = config; |
|
|
|
this.time = new SystemTime(); |
|
|
|
this.time = new SystemTime(); |
|
|
|
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) |
|
|
|
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) |
|
|
|
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), |
|
|
|
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), |
|
|
@ -307,14 +309,16 @@ public class KafkaProducer<K,V> implements Producer<K,V> { |
|
|
|
serializedKey = keySerializer.serialize(record.topic(), record.key()); |
|
|
|
serializedKey = keySerializer.serialize(record.topic(), record.key()); |
|
|
|
} catch (ClassCastException cce) { |
|
|
|
} catch (ClassCastException cce) { |
|
|
|
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + |
|
|
|
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + |
|
|
|
" to the one specified in key.serializer"); |
|
|
|
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + |
|
|
|
|
|
|
|
" specified in key.serializer"); |
|
|
|
} |
|
|
|
} |
|
|
|
byte[] serializedValue; |
|
|
|
byte[] serializedValue; |
|
|
|
try { |
|
|
|
try { |
|
|
|
serializedValue = valueSerializer.serialize(record.topic(), record.value()); |
|
|
|
serializedValue = valueSerializer.serialize(record.topic(), record.value()); |
|
|
|
} catch (ClassCastException cce) { |
|
|
|
} catch (ClassCastException cce) { |
|
|
|
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + |
|
|
|
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + |
|
|
|
" to the one specified in value.serializer"); |
|
|
|
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + |
|
|
|
|
|
|
|
" specified in value.serializer"); |
|
|
|
} |
|
|
|
} |
|
|
|
ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue); |
|
|
|
ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue); |
|
|
|
int partition = partitioner.partition(serializedRecord, metadata.fetch()); |
|
|
|
int partition = partitioner.partition(serializedRecord, metadata.fetch()); |
|
|
|