diff --git a/build.gradle b/build.gradle
index 63c110aefc9..75ae5faa041 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1294,7 +1294,6 @@ project(':examples') {
dependencies {
implementation project(':clients')
- implementation project(':server-common')
}
javadoc {
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index a08563a3b5a..f97dda31461 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -71,7 +71,6 @@
-
diff --git a/examples/README b/examples/README
deleted file mode 100644
index dc5b3947cd1..00000000000
--- a/examples/README
+++ /dev/null
@@ -1,12 +0,0 @@
-This directory contains examples of client code that uses kafka.
-
-To run the demo:
-
- 1. In Zookeeper mode, Start Zookeeper and the Kafka server. In KRaft mode, start the Kafka server.
- 2. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
- 3. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
- 4. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`,
- this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records.
- 5. Some notes for exactly once demo:
- 5.1. The Kafka server has to be on broker version 2.5 or higher.
- 5.2. You could also use IntelliJ IDEA to run the example directly by configuring parameters as "Program arguments"
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 00000000000..6451982f0a5
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,9 @@
+# Kafka client examples
+
+This module contains some Kafka client examples.
+
+1. Start a Kafka 2.5+ local cluster with a plain listener configured on port 9092.
+2. Run `examples/bin/java-producer-consumer-demo.sh 10000` to asynchronously send 10k records to topic1 and consume them.
+3. Run `examples/bin/java-producer-consumer-demo.sh 10000 sync` to synchronous send 10k records to topic1 and consume them.
+4. Run `examples/bin/exactly-once-demo.sh 6 3 10000` to create input-topic and output-topic with 6 partitions each,
+ start 3 transactional application instances and process 10k records.
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index ff44efe492e..3c6424c7dca 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,29 +16,59 @@
*/
package kafka.examples;
-import org.apache.kafka.common.errors.TimeoutException;
-
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
+ * record all the log output together.
+ */
public class KafkaConsumerProducerDemo {
- public static void main(String[] args) throws InterruptedException {
- boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
- CountDownLatch latch = new CountDownLatch(2);
- Producer producerThread = new Producer(
- "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
- producerThread.start();
-
- Consumer consumerThread = new Consumer(
- "consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
- consumerThread.start();
-
- if (!latch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish");
- }
+ public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ public static final String TOPIC_NAME = "my-topic";
+ public static final String GROUP_NAME = "my-group";
+
+ public static void main(String[] args) {
+ try {
+ if (args.length == 0) {
+ Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" +
+ "- records: total number of records to send (required)%n" +
+ "- mode: pass 'sync' to send records synchronously (optional)");
+ return;
+ }
- consumerThread.shutdown();
- System.out.println("All finished!");
+ int numRecords = Integer.parseInt(args[0]);
+ boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync");
+
+ // stage 1: clean any topics left from previous runs
+ Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+ CountDownLatch latch = new CountDownLatch(2);
+
+ // stage 2: produce records to topic1
+ Producer producerThread = new Producer(
+ "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
+ producerThread.start();
+
+ // stage 3: consume records from topic1
+ Consumer consumerThread = new Consumer(
+ "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
+ consumerThread.start();
+
+ if (!latch.await(5, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 5 minutes waiting for termination");
+ producerThread.shutdown();
+ consumerThread.shutdown();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
}
}
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 1a2cfcb8a24..9d94337491d 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -16,185 +16,93 @@
*/
package kafka.examples;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
- * This exactly once demo driver takes 3 arguments:
- * - partition: number of partitions for input/output topic
- * - instances: number of instances
- * - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
- * the input topic. The driver will block for the record generation to finish, so the producer
- * must be in synchronous sending mode.
+ * This example can be decomposed into the following stages:
*
- * 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
- * tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
- * drain all the records from either given partitions or auto assigned partitions by actively
- * comparing log end offset with committed offset. Each record will be processed exactly once
- * as dividing the key by 2, and extend the value message. The driver will block for all the record
- * processing to finish. The transformed record shall be written to the output topic, with
- * transactional guarantee.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
+ * The demo will block for the record generation to finish, so the producer is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
+ * (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
+ * partitions or auto assigned partitions by actively comparing log end offset with committed offset.
+ * Each record will be processed exactly-once with strong partition level ordering guarantee.
+ * The demo will block until all records are processed and written to the output topic.
+ * 4. Create a read_committed consumer thread to verify we have all records in the output topic,
+ * and record ordering at the partition level is maintained.
+ * The demo will block for the consumption of all committed records, with transactional guarantee.
*
- * 4. Set up a read committed consumer in a separate thread to verify we have all records within
- * the output topic, while the message ordering on partition level is maintained.
- * The driver will block for the consumption of all committed records.
- *
- * From this demo, you could see that all the records from pre-population are processed exactly once,
- * with strong partition level ordering guarantee.
- *
- * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
- * in order to run, otherwise the app could throw
+ * Broker version must be >= 2.5.0 in order to run, otherwise the example will throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
+ * record all the log output together.
*/
public class KafkaExactlyOnceDemo {
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
+ public static final String GROUP_NAME = "check-group";
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- if (args.length != 3) {
- throw new IllegalArgumentException("Should accept 3 parameters: " +
- "[number of partitions], [number of instances], [number of records]");
- }
-
- int numPartitions = Integer.parseInt(args[0]);
- int numInstances = Integer.parseInt(args[1]);
- int numRecords = Integer.parseInt(args[2]);
-
- /* Stage 1: topic cleanup and recreation */
- recreateTopics(numPartitions);
-
- CountDownLatch prePopulateLatch = new CountDownLatch(1);
-
- /* Stage 2: pre-populate records */
- Producer producerThread = new Producer(
- "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
- producerThread.start();
-
- if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
- }
-
- CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
-
- /* Stage 3: transactionally process all messages */
- CountDownLatch processorsLatch = new CountDownLatch(numInstances);
- List processors = IntStream.range(0, numInstances)
- .mapToObj(id -> new ExactlyOnceMessageProcessor(
- "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
- .collect(Collectors.toList());
- processors.forEach(ExactlyOnceMessageProcessor::start);
-
- if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
- }
-
- CountDownLatch consumeLatch = new CountDownLatch(1);
-
- /* Stage 4: consume all processed messages to verify exactly once */
- Consumer consumerThread = new Consumer(
- "consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
- consumerThread.start();
-
- if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
- }
-
- consumerThread.shutdown();
- System.out.println("All finished!");
- }
-
- private static void recreateTopics(final int numPartitions)
- throws ExecutionException, InterruptedException {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-
- Admin adminClient = Admin.create(props);
-
- List topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
-
- deleteTopic(adminClient, topicsToDelete);
-
- // Check topic existence in a retry loop
- while (true) {
- System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);
-
- Set listedTopics = adminClient.listTopics().names().get();
- System.out.println("Current list of topics: " + listedTopics);
-
- boolean hasTopicInfo = false;
- for (String listedTopic : listedTopics) {
- if (topicsToDelete.contains(listedTopic)) {
- hasTopicInfo = true;
- break;
- }
- }
- if (!hasTopicInfo) {
- break;
+ public static void main(String[] args) {
+ try {
+ if (args.length != 3) {
+ Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n" +
+ "- partition: number of partitions for input and output topics (required)%n" +
+ "- instances: number of application instances (required)%n" +
+ "- records: total number of records (required)");
+ return;
}
- Thread.sleep(1000);
- }
- // Create topics in a retry loop
- while (true) {
- final short replicationFactor = 1;
- final List newTopics = Arrays.asList(
- new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
- new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
- try {
- adminClient.createTopics(newTopics).all().get();
- System.out.println("Created new topics: " + newTopics);
- break;
- } catch (ExecutionException e) {
- if (!(e.getCause() instanceof TopicExistsException)) {
- throw e;
- }
- System.out.println("Metadata of the old topics are not cleared yet...");
-
- deleteTopic(adminClient, topicsToDelete);
+ int numPartitions = Integer.parseInt(args[0]);
+ int numInstances = Integer.parseInt(args[1]);
+ int numRecords = Integer.parseInt(args[2]);
+
+ // stage 1: clean any topics left from previous runs
+ Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);
+
+ // stage 2: send demo records to the input-topic
+ CountDownLatch producerLatch = new CountDownLatch(1);
+ Producer producerThread = new Producer(
+ "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
+ producerThread.start();
+ if (!producerLatch.await(2, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 2 minutes waiting for data load");
+ producerThread.shutdown();
+ return;
+ }
- Thread.sleep(1000);
+ // stage 3: read from input-topic, process once and write to the output-topic
+ CountDownLatch processorsLatch = new CountDownLatch(numInstances);
+ List processors = IntStream.range(0, numInstances)
+ .mapToObj(id -> new ExactlyOnceMessageProcessor(
+ "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
+ .collect(Collectors.toList());
+ processors.forEach(ExactlyOnceMessageProcessor::start);
+ if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 2 minutes waiting for record copy");
+ processors.forEach(ExactlyOnceMessageProcessor::shutdown);
+ return;
}
- }
- }
- private static void deleteTopic(final Admin adminClient, final List topicsToDelete)
- throws InterruptedException, ExecutionException {
- try {
- adminClient.deleteTopics(topicsToDelete).all().get();
- } catch (ExecutionException e) {
- if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
- throw e;
+ // stage 4: check consuming records from the output-topic
+ CountDownLatch consumerLatch = new CountDownLatch(1);
+ Consumer consumerThread = new Consumer(
+ "consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch);
+ consumerThread.start();
+ if (!consumerLatch.await(2, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 2 minutes waiting for output read");
+ consumerThread.shutdown();
}
- System.out.println("Encountered exception during topic deletion: " + e.getCause());
+ } catch (Throwable e) {
+ e.printStackTrace();
}
- System.out.println("Deleted old topics: " + topicsToDelete);
}
}