Browse Source

KAFKA-14752: Kafka examples improvements - producer changes (#13515)

KAFKA-14752: Kafka examples improvements - producer changes

Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <christololov@gmail.com>
pull/13689/head
Federico Valeri 2 years ago committed by GitHub
parent
commit
78090bb4cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
  2. 3
      examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
  3. 3
      examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
  4. 216
      examples/src/main/java/kafka/examples/Producer.java

4
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java

@ -66,7 +66,9 @@ public class ExactlyOnceMessageProcessor extends Thread { @@ -66,7 +66,9 @@ public class ExactlyOnceMessageProcessor extends Thread {
// It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
// A unique transactional.id must be provided in order to properly use EOS.
producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
producer = new Producer(
"processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null)
.createKafkaProducer();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;

3
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java

@ -26,7 +26,8 @@ public class KafkaConsumerProducerDemo { @@ -26,7 +26,8 @@ public class KafkaConsumerProducerDemo {
public static void main(String[] args) throws InterruptedException {
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
CountDownLatch latch = new CountDownLatch(2);
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
Producer producerThread = new Producer(
"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
producerThread.start();
Consumer consumerThread = new Consumer(

3
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java

@ -91,7 +91,8 @@ public class KafkaExactlyOnceDemo { @@ -91,7 +91,8 @@ public class KafkaExactlyOnceDemo {
CountDownLatch prePopulateLatch = new CountDownLatch(1);
/* Stage 2: pre-populate records */
Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
Producer producerThread = new Producer(
"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
producerThread.start();
if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {

216
examples/src/main/java/kafka/examples/Producer.java

@ -21,133 +21,165 @@ import org.apache.kafka.clients.producer.KafkaProducer; @@ -21,133 +21,165 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Demo producer that demonstrate two modes of KafkaProducer.
* If the user uses the Async mode: The messages will be printed to stdout upon successful completion
* If the user uses the sync mode (isAsync = false): Each send loop will block until completion.
* A simple producer thread supporting two send modes:
* - Async mode (default): records are sent without waiting for the response.
* - Sync mode: each send operation blocks waiting for the response.
*/
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String bootstrapServers;
private final String topic;
private final Boolean isAsync;
private int numRecords;
private final boolean isAsync;
private final String transactionalId;
private final boolean enableIdempotency;
private final int numRecords;
private final int transactionTimeoutMs;
private final CountDownLatch latch;
private volatile boolean closed;
public Producer(final String topic,
final Boolean isAsync,
final String transactionalId,
final boolean enableIdempotency,
final int numRecords,
final int transactionTimeoutMs,
final CountDownLatch latch) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (transactionTimeoutMs > 0) {
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
if (transactionalId != null) {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
producer = new KafkaProducer<>(props);
public Producer(String threadName,
String bootstrapServers,
String topic,
boolean isAsync,
String transactionalId,
boolean enableIdempotency,
int numRecords,
int transactionTimeoutMs,
CountDownLatch latch) {
super(threadName);
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.isAsync = isAsync;
this.transactionalId = transactionalId;
this.enableIdempotency = enableIdempotency;
this.numRecords = numRecords;
this.transactionTimeoutMs = transactionTimeoutMs;
this.latch = latch;
}
KafkaProducer<Integer, String> get() {
return producer;
}
@Override
public void run() {
int messageKey = 0;
int recordsSent = 0;
try {
while (recordsSent < numRecords) {
final long currentTimeMs = System.currentTimeMillis();
produceOnce(messageKey, recordsSent, currentTimeMs);
messageKey += 2;
recordsSent += 1;
int key = 0;
int sentRecords = 0;
// the producer instance is thread safe
try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
while (!closed && sentRecords < numRecords) {
if (isAsync) {
asyncSend(producer, key, "test" + key);
} else {
syncSend(producer, key, "test" + key);
}
key++;
sentRecords++;
}
} catch (Exception e) {
System.out.println("Producer encountered exception:" + e);
} finally {
System.out.println("Producer sent " + numRecords + " records successfully");
this.producer.close();
latch.countDown();
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Sent %d records", sentRecords);
shutdown();
}
private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException {
String messageStr = "Message_" + messageKey;
if (isAsync) { // Send asynchronously
sendAsync(messageKey, messageStr, currentTimeMs);
return;
public void shutdown() {
if (!closed) {
closed = true;
latch.countDown();
}
Future<RecordMetadata> future = send(messageKey, messageStr);
future.get();
System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
}
private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
this.producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr),
new DemoCallBack(currentTimeMs, messageKey, messageStr));
public KafkaProducer<Integer, String> createKafkaProducer() {
Properties props = new Properties();
// bootstrap server config is required for producer to connect to brokers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// client id is not required, but it's good to track the source of requests beyond just ip/port
// by allowing a logical application name to be included in server-side request logging
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
// key and value are just byte arrays, so we need to set appropriate serializers
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (transactionTimeoutMs > 0) {
// max time before the transaction coordinator proactively aborts the ongoing transaction
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
if (transactionalId != null) {
// the transactional id must be static and unique
// it is used to identify the same producer instance across process restarts
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
// enable duplicates protection at the partition level
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
return new KafkaProducer<>(props);
}
private Future<RecordMetadata> send(final int messageKey, final String messageStr) {
return producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr));
private void asyncSend(KafkaProducer<Integer, String> producer, int key, String value) {
// send the record asynchronously, setting a callback to be notified of the result
// note that, even if you set a small batch.size with linger.ms=0, the send operation
// will still be blocked when buffer.memory is full or metadata are not available
producer.send(new ProducerRecord<>(topic, key, value), new ProducerCallback(key, value));
}
}
class DemoCallBack implements Callback {
private RecordMetadata syncSend(KafkaProducer<Integer, String> producer, int key, String value)
throws ExecutionException, InterruptedException {
try {
// send the record and then call get, which blocks waiting for the ack from the broker
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
Utils.maybePrintRecord(numRecords, key, value, metadata);
return metadata;
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
Utils.printErr(e.getMessage());
// we can't recover from these exceptions
shutdown();
} catch (KafkaException e) {
Utils.printErr(e.getMessage());
}
return null;
}
private final long startTime;
private final int key;
private final String message;
class ProducerCallback implements Callback {
private final int key;
private final String value;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
public ProducerCallback(int key, String value) {
this.key = key;
this.value = value;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
* with -1 value for all fields except for topicPartition will be returned if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
* with -1 value for all fields except for topicPartition will be returned if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
Utils.printErr(exception.getMessage());
if (!(exception instanceof RetriableException)) {
// we can't recover from these exceptions
shutdown();
}
} else {
Utils.maybePrintRecord(numRecords, key, value, metadata);
}
}
}
}

Loading…
Cancel
Save