Browse Source

KAFKA-14752: Kafka examples improvements - demo changes (#13517)

KAFKA-14752: Kafka examples improvements - demo changes

Reviewers: Luke Chen <showuon@gmail.com>
pull/13713/head
Federico Valeri 2 years ago committed by GitHub
parent
commit
c757af5f7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build.gradle
  2. 1
      checkstyle/import-control-core.xml
  3. 12
      examples/README
  4. 9
      examples/README.md
  5. 66
      examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
  6. 220
      examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java

1
build.gradle

@ -1294,7 +1294,6 @@ project(':examples') {
dependencies { dependencies {
implementation project(':clients') implementation project(':clients')
implementation project(':server-common')
} }
javadoc { javadoc {

1
checkstyle/import-control-core.xml

@ -71,7 +71,6 @@
<subpackage name="examples"> <subpackage name="examples">
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage> </subpackage>
<subpackage name="log.remote"> <subpackage name="log.remote">

12
examples/README

@ -1,12 +0,0 @@
This directory contains examples of client code that uses kafka.
To run the demo:
1. In Zookeeper mode, Start Zookeeper and the Kafka server. In KRaft mode, start the Kafka server.
2. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
3. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
4. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`,
this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records.
5. Some notes for exactly once demo:
5.1. The Kafka server has to be on broker version 2.5 or higher.
5.2. You could also use IntelliJ IDEA to run the example directly by configuring parameters as "Program arguments"

9
examples/README.md

@ -0,0 +1,9 @@
# Kafka client examples
This module contains some Kafka client examples.
1. Start a Kafka 2.5+ local cluster with a plain listener configured on port 9092.
2. Run `examples/bin/java-producer-consumer-demo.sh 10000` to asynchronously send 10k records to topic1 and consume them.
3. Run `examples/bin/java-producer-consumer-demo.sh 10000 sync` to synchronous send 10k records to topic1 and consume them.
4. Run `examples/bin/exactly-once-demo.sh 6 3 10000` to create input-topic and output-topic with 6 partitions each,
start 3 transactional application instances and process 10k records.

66
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java

@ -16,29 +16,59 @@
*/ */
package kafka.examples; package kafka.examples;
import org.apache.kafka.common.errors.TimeoutException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* This example can be decomposed into the following stages:
*
* 1. Clean any topics left from previous runs.
* 2. Create a producer thread to send a set of records to topic1.
* 3. Create a consumer thread to fetch all previously sent records from topic1.
*
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
* record all the log output together.
*/
public class KafkaConsumerProducerDemo { public class KafkaConsumerProducerDemo {
public static void main(String[] args) throws InterruptedException { public static final String BOOTSTRAP_SERVERS = "localhost:9092";
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); public static final String TOPIC_NAME = "my-topic";
CountDownLatch latch = new CountDownLatch(2); public static final String GROUP_NAME = "my-group";
Producer producerThread = new Producer(
"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch); public static void main(String[] args) {
producerThread.start(); try {
if (args.length == 0) {
Consumer consumerThread = new Consumer( Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" +
"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch); "- records: total number of records to send (required)%n" +
consumerThread.start(); "- mode: pass 'sync' to send records synchronously (optional)");
return;
if (!latch.await(5, TimeUnit.MINUTES)) { }
throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish");
}
consumerThread.shutdown(); int numRecords = Integer.parseInt(args[0]);
System.out.println("All finished!"); boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync");
// stage 1: clean any topics left from previous runs
Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
CountDownLatch latch = new CountDownLatch(2);
// stage 2: produce records to topic1
Producer producerThread = new Producer(
"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
producerThread.start();
// stage 3: consume records from topic1
Consumer consumerThread = new Consumer(
"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
consumerThread.start();
if (!latch.await(5, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 5 minutes waiting for termination");
producerThread.shutdown();
consumerThread.shutdown();
}
} catch (Throwable e) {
e.printStackTrace();
}
} }
} }

220
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java

@ -16,185 +16,93 @@
*/ */
package kafka.examples; package kafka.examples;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
/** /**
* This exactly once demo driver takes 3 arguments: * This example can be decomposed into the following stages:
* - partition: number of partitions for input/output topic
* - instances: number of instances
* - records: number of records
* An example argument list would be `6 3 50000`.
*
* If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
* Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
* output to file` to record all the log output together.
*
* The driver could be decomposed as following stages:
*
* 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
*
* 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
* the input topic. The driver will block for the record generation to finish, so the producer
* must be in synchronous sending mode.
* *
* 3. Set up transactional instances in separate threads which does a consume-process-produce loop, * 1. Clean any topics left from previous runs.
* tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
* drain all the records from either given partitions or auto assigned partitions by actively * The demo will block for the record generation to finish, so the producer is synchronous.
* comparing log end offset with committed offset. Each record will be processed exactly once * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
* as dividing the key by 2, and extend the value message. The driver will block for all the record * (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
* processing to finish. The transformed record shall be written to the output topic, with * partitions or auto assigned partitions by actively comparing log end offset with committed offset.
* transactional guarantee. * Each record will be processed exactly-once with strong partition level ordering guarantee.
* The demo will block until all records are processed and written to the output topic.
* 4. Create a read_committed consumer thread to verify we have all records in the output topic,
* and record ordering at the partition level is maintained.
* The demo will block for the consumption of all committed records, with transactional guarantee.
* *
* 4. Set up a read committed consumer in a separate thread to verify we have all records within * Broker version must be >= 2.5.0 in order to run, otherwise the example will throw
* the output topic, while the message ordering on partition level is maintained.
* The driver will block for the consumption of all committed records.
*
* From this demo, you could see that all the records from pre-population are processed exactly once,
* with strong partition level ordering guarantee.
*
* Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
* in order to run, otherwise the app could throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}. * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
* record all the log output together.
*/ */
public class KafkaExactlyOnceDemo { public class KafkaExactlyOnceDemo {
public static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic"; private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic"; private static final String OUTPUT_TOPIC = "output-topic";
public static final String GROUP_NAME = "check-group";
public static void main(String[] args) throws InterruptedException, ExecutionException { public static void main(String[] args) {
if (args.length != 3) { try {
throw new IllegalArgumentException("Should accept 3 parameters: " + if (args.length != 3) {
"[number of partitions], [number of instances], [number of records]"); Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n" +
} "- partition: number of partitions for input and output topics (required)%n" +
"- instances: number of application instances (required)%n" +
int numPartitions = Integer.parseInt(args[0]); "- records: total number of records (required)");
int numInstances = Integer.parseInt(args[1]); return;
int numRecords = Integer.parseInt(args[2]);
/* Stage 1: topic cleanup and recreation */
recreateTopics(numPartitions);
CountDownLatch prePopulateLatch = new CountDownLatch(1);
/* Stage 2: pre-populate records */
Producer producerThread = new Producer(
"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
producerThread.start();
if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
}
CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
/* Stage 3: transactionally process all messages */
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);
if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
}
CountDownLatch consumeLatch = new CountDownLatch(1);
/* Stage 4: consume all processed messages to verify exactly once */
Consumer consumerThread = new Consumer(
"consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
consumerThread.start();
if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
}
consumerThread.shutdown();
System.out.println("All finished!");
}
private static void recreateTopics(final int numPartitions)
throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
Admin adminClient = Admin.create(props);
List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
deleteTopic(adminClient, topicsToDelete);
// Check topic existence in a retry loop
while (true) {
System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);
Set<String> listedTopics = adminClient.listTopics().names().get();
System.out.println("Current list of topics: " + listedTopics);
boolean hasTopicInfo = false;
for (String listedTopic : listedTopics) {
if (topicsToDelete.contains(listedTopic)) {
hasTopicInfo = true;
break;
}
}
if (!hasTopicInfo) {
break;
} }
Thread.sleep(1000);
}
// Create topics in a retry loop int numPartitions = Integer.parseInt(args[0]);
while (true) { int numInstances = Integer.parseInt(args[1]);
final short replicationFactor = 1; int numRecords = Integer.parseInt(args[2]);
final List<NewTopic> newTopics = Arrays.asList(
new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor), // stage 1: clean any topics left from previous runs
new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor)); Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);
try {
adminClient.createTopics(newTopics).all().get(); // stage 2: send demo records to the input-topic
System.out.println("Created new topics: " + newTopics); CountDownLatch producerLatch = new CountDownLatch(1);
break; Producer producerThread = new Producer(
} catch (ExecutionException e) { "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
if (!(e.getCause() instanceof TopicExistsException)) { producerThread.start();
throw e; if (!producerLatch.await(2, TimeUnit.MINUTES)) {
} Utils.printErr("Timeout after 2 minutes waiting for data load");
System.out.println("Metadata of the old topics are not cleared yet..."); producerThread.shutdown();
return;
deleteTopic(adminClient, topicsToDelete); }
Thread.sleep(1000); // stage 3: read from input-topic, process once and write to the output-topic
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);
if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 2 minutes waiting for record copy");
processors.forEach(ExactlyOnceMessageProcessor::shutdown);
return;
} }
}
}
private static void deleteTopic(final Admin adminClient, final List<String> topicsToDelete) // stage 4: check consuming records from the output-topic
throws InterruptedException, ExecutionException { CountDownLatch consumerLatch = new CountDownLatch(1);
try { Consumer consumerThread = new Consumer(
adminClient.deleteTopics(topicsToDelete).all().get(); "consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch);
} catch (ExecutionException e) { consumerThread.start();
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { if (!consumerLatch.await(2, TimeUnit.MINUTES)) {
throw e; Utils.printErr("Timeout after 2 minutes waiting for output read");
consumerThread.shutdown();
} }
System.out.println("Encountered exception during topic deletion: " + e.getCause()); } catch (Throwable e) {
e.printStackTrace();
} }
System.out.println("Deleted old topics: " + topicsToDelete);
} }
} }

Loading…
Cancel
Save