From c757af5f7c630d532bfee5f6dc45aec603ad8a29 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Fri, 12 May 2023 04:39:12 +0200 Subject: [PATCH] KAFKA-14752: Kafka examples improvements - demo changes (#13517) KAFKA-14752: Kafka examples improvements - demo changes Reviewers: Luke Chen --- build.gradle | 1 - checkstyle/import-control-core.xml | 1 - examples/README | 12 - examples/README.md | 9 + .../examples/KafkaConsumerProducerDemo.java | 66 ++++-- .../kafka/examples/KafkaExactlyOnceDemo.java | 220 +++++------------- 6 files changed, 121 insertions(+), 188 deletions(-) delete mode 100644 examples/README create mode 100644 examples/README.md diff --git a/build.gradle b/build.gradle index 63c110aefc9..75ae5faa041 100644 --- a/build.gradle +++ b/build.gradle @@ -1294,7 +1294,6 @@ project(':examples') { dependencies { implementation project(':clients') - implementation project(':server-common') } javadoc { diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index a08563a3b5a..f97dda31461 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -71,7 +71,6 @@ - diff --git a/examples/README b/examples/README deleted file mode 100644 index dc5b3947cd1..00000000000 --- a/examples/README +++ /dev/null @@ -1,12 +0,0 @@ -This directory contains examples of client code that uses kafka. - -To run the demo: - - 1. In Zookeeper mode, Start Zookeeper and the Kafka server. In KRaft mode, start the Kafka server. - 2. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` - 3. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh` - 4. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`, - this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records. - 5. Some notes for exactly once demo: - 5.1. The Kafka server has to be on broker version 2.5 or higher. - 5.2. You could also use IntelliJ IDEA to run the example directly by configuring parameters as "Program arguments" diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000000..6451982f0a5 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,9 @@ +# Kafka client examples + +This module contains some Kafka client examples. + +1. Start a Kafka 2.5+ local cluster with a plain listener configured on port 9092. +2. Run `examples/bin/java-producer-consumer-demo.sh 10000` to asynchronously send 10k records to topic1 and consume them. +3. Run `examples/bin/java-producer-consumer-demo.sh 10000 sync` to synchronous send 10k records to topic1 and consume them. +4. Run `examples/bin/exactly-once-demo.sh 6 3 10000` to create input-topic and output-topic with 6 partitions each, + start 3 transactional application instances and process 10k records. diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index ff44efe492e..3c6424c7dca 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -16,29 +16,59 @@ */ package kafka.examples; -import org.apache.kafka.common.errors.TimeoutException; - import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +/** + * This example can be decomposed into the following stages: + * + * 1. Clean any topics left from previous runs. + * 2. Create a producer thread to send a set of records to topic1. + * 3. Create a consumer thread to fetch all previously sent records from topic1. + * + * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. + * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to + * record all the log output together. + */ public class KafkaConsumerProducerDemo { - public static void main(String[] args) throws InterruptedException { - boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); - CountDownLatch latch = new CountDownLatch(2); - Producer producerThread = new Producer( - "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch); - producerThread.start(); - - Consumer consumerThread = new Consumer( - "consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch); - consumerThread.start(); - - if (!latch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish"); - } + public static final String BOOTSTRAP_SERVERS = "localhost:9092"; + public static final String TOPIC_NAME = "my-topic"; + public static final String GROUP_NAME = "my-group"; + + public static void main(String[] args) { + try { + if (args.length == 0) { + Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" + + "- records: total number of records to send (required)%n" + + "- mode: pass 'sync' to send records synchronously (optional)"); + return; + } - consumerThread.shutdown(); - System.out.println("All finished!"); + int numRecords = Integer.parseInt(args[0]); + boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync"); + + // stage 1: clean any topics left from previous runs + Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME); + CountDownLatch latch = new CountDownLatch(2); + + // stage 2: produce records to topic1 + Producer producerThread = new Producer( + "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch); + producerThread.start(); + + // stage 3: consume records from topic1 + Consumer consumerThread = new Consumer( + "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch); + consumerThread.start(); + + if (!latch.await(5, TimeUnit.MINUTES)) { + Utils.printErr("Timeout after 5 minutes waiting for termination"); + producerThread.shutdown(); + consumerThread.shutdown(); + } + } catch (Throwable e) { + e.printStackTrace(); + } } } diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 1a2cfcb8a24..9d94337491d 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -16,185 +16,93 @@ */ package kafka.examples; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; - -import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.Properties; -import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; /** - * This exactly once demo driver takes 3 arguments: - * - partition: number of partitions for input/output topic - * - instances: number of instances - * - records: number of records - * An example argument list would be `6 3 50000`. - * - * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`. - * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console - * output to file` to record all the log output together. - * - * The driver could be decomposed as following stages: - * - * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start. - * - * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into - * the input topic. The driver will block for the record generation to finish, so the producer - * must be in synchronous sending mode. + * This example can be decomposed into the following stages: * - * 3. Set up transactional instances in separate threads which does a consume-process-produce loop, - * tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will - * drain all the records from either given partitions or auto assigned partitions by actively - * comparing log end offset with committed offset. Each record will be processed exactly once - * as dividing the key by 2, and extend the value message. The driver will block for all the record - * processing to finish. The transformed record shall be written to the output topic, with - * transactional guarantee. + * 1. Clean any topics left from previous runs. + * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic. + * The demo will block for the record generation to finish, so the producer is synchronous. + * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop + * (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given + * partitions or auto assigned partitions by actively comparing log end offset with committed offset. + * Each record will be processed exactly-once with strong partition level ordering guarantee. + * The demo will block until all records are processed and written to the output topic. + * 4. Create a read_committed consumer thread to verify we have all records in the output topic, + * and record ordering at the partition level is maintained. + * The demo will block for the consumption of all committed records, with transactional guarantee. * - * 4. Set up a read committed consumer in a separate thread to verify we have all records within - * the output topic, while the message ordering on partition level is maintained. - * The driver will block for the consumption of all committed records. - * - * From this demo, you could see that all the records from pre-population are processed exactly once, - * with strong partition level ordering guarantee. - * - * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5 - * in order to run, otherwise the app could throw + * Broker version must be >= 2.5.0 in order to run, otherwise the example will throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. + * + * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. + * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to + * record all the log output together. */ public class KafkaExactlyOnceDemo { public static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String INPUT_TOPIC = "input-topic"; private static final String OUTPUT_TOPIC = "output-topic"; + public static final String GROUP_NAME = "check-group"; - public static void main(String[] args) throws InterruptedException, ExecutionException { - if (args.length != 3) { - throw new IllegalArgumentException("Should accept 3 parameters: " + - "[number of partitions], [number of instances], [number of records]"); - } - - int numPartitions = Integer.parseInt(args[0]); - int numInstances = Integer.parseInt(args[1]); - int numRecords = Integer.parseInt(args[2]); - - /* Stage 1: topic cleanup and recreation */ - recreateTopics(numPartitions); - - CountDownLatch prePopulateLatch = new CountDownLatch(1); - - /* Stage 2: pre-populate records */ - Producer producerThread = new Producer( - "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch); - producerThread.start(); - - if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population"); - } - - CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); - - /* Stage 3: transactionally process all messages */ - CountDownLatch processorsLatch = new CountDownLatch(numInstances); - List processors = IntStream.range(0, numInstances) - .mapToObj(id -> new ExactlyOnceMessageProcessor( - "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch)) - .collect(Collectors.toList()); - processors.forEach(ExactlyOnceMessageProcessor::start); - - if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy"); - } - - CountDownLatch consumeLatch = new CountDownLatch(1); - - /* Stage 4: consume all processed messages to verify exactly once */ - Consumer consumerThread = new Consumer( - "consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch); - consumerThread.start(); - - if (!consumeLatch.await(5, TimeUnit.MINUTES)) { - throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption"); - } - - consumerThread.shutdown(); - System.out.println("All finished!"); - } - - private static void recreateTopics(final int numPartitions) - throws ExecutionException, InterruptedException { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); - - Admin adminClient = Admin.create(props); - - List topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC); - - deleteTopic(adminClient, topicsToDelete); - - // Check topic existence in a retry loop - while (true) { - System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete); - - Set listedTopics = adminClient.listTopics().names().get(); - System.out.println("Current list of topics: " + listedTopics); - - boolean hasTopicInfo = false; - for (String listedTopic : listedTopics) { - if (topicsToDelete.contains(listedTopic)) { - hasTopicInfo = true; - break; - } - } - if (!hasTopicInfo) { - break; + public static void main(String[] args) { + try { + if (args.length != 3) { + Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n" + + "- partition: number of partitions for input and output topics (required)%n" + + "- instances: number of application instances (required)%n" + + "- records: total number of records (required)"); + return; } - Thread.sleep(1000); - } - // Create topics in a retry loop - while (true) { - final short replicationFactor = 1; - final List newTopics = Arrays.asList( - new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor), - new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor)); - try { - adminClient.createTopics(newTopics).all().get(); - System.out.println("Created new topics: " + newTopics); - break; - } catch (ExecutionException e) { - if (!(e.getCause() instanceof TopicExistsException)) { - throw e; - } - System.out.println("Metadata of the old topics are not cleared yet..."); - - deleteTopic(adminClient, topicsToDelete); + int numPartitions = Integer.parseInt(args[0]); + int numInstances = Integer.parseInt(args[1]); + int numRecords = Integer.parseInt(args[2]); + + // stage 1: clean any topics left from previous runs + Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC); + + // stage 2: send demo records to the input-topic + CountDownLatch producerLatch = new CountDownLatch(1); + Producer producerThread = new Producer( + "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch); + producerThread.start(); + if (!producerLatch.await(2, TimeUnit.MINUTES)) { + Utils.printErr("Timeout after 2 minutes waiting for data load"); + producerThread.shutdown(); + return; + } - Thread.sleep(1000); + // stage 3: read from input-topic, process once and write to the output-topic + CountDownLatch processorsLatch = new CountDownLatch(numInstances); + List processors = IntStream.range(0, numInstances) + .mapToObj(id -> new ExactlyOnceMessageProcessor( + "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch)) + .collect(Collectors.toList()); + processors.forEach(ExactlyOnceMessageProcessor::start); + if (!processorsLatch.await(2, TimeUnit.MINUTES)) { + Utils.printErr("Timeout after 2 minutes waiting for record copy"); + processors.forEach(ExactlyOnceMessageProcessor::shutdown); + return; } - } - } - private static void deleteTopic(final Admin adminClient, final List topicsToDelete) - throws InterruptedException, ExecutionException { - try { - adminClient.deleteTopics(topicsToDelete).all().get(); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { - throw e; + // stage 4: check consuming records from the output-topic + CountDownLatch consumerLatch = new CountDownLatch(1); + Consumer consumerThread = new Consumer( + "consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch); + consumerThread.start(); + if (!consumerLatch.await(2, TimeUnit.MINUTES)) { + Utils.printErr("Timeout after 2 minutes waiting for output read"); + consumerThread.shutdown(); } - System.out.println("Encountered exception during topic deletion: " + e.getCause()); + } catch (Throwable e) { + e.printStackTrace(); } - System.out.println("Deleted old topics: " + topicsToDelete); } }