|
|
|
@ -25,7 +25,7 @@ import java.util.stream.IntStream;
@@ -25,7 +25,7 @@ import java.util.stream.IntStream;
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* This example can be decomposed into the following stages: |
|
|
|
|
* |
|
|
|
|
* <p> |
|
|
|
|
* 1. Clean any topics left from previous runs. |
|
|
|
|
* 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic. |
|
|
|
|
* The demo will block for the record generation to finish, so the producer is synchronous. |
|
|
|
@ -37,16 +37,15 @@ import java.util.stream.IntStream;
@@ -37,16 +37,15 @@ import java.util.stream.IntStream;
|
|
|
|
|
* 4. Create a read_committed consumer thread to verify we have all records in the output topic, |
|
|
|
|
* and record ordering at the partition level is maintained. |
|
|
|
|
* The demo will block for the consumption of all committed records, with transactional guarantee. |
|
|
|
|
* |
|
|
|
|
* <p> |
|
|
|
|
* Broker version must be >= 2.5.0 in order to run, otherwise the example will throw |
|
|
|
|
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}. |
|
|
|
|
* |
|
|
|
|
* <p> |
|
|
|
|
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. |
|
|
|
|
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to |
|
|
|
|
* record all the log output together. |
|
|
|
|
*/ |
|
|
|
|
public class KafkaExactlyOnceDemo { |
|
|
|
|
public static final String BOOTSTRAP_SERVERS = "localhost:9092"; |
|
|
|
|
private static final String INPUT_TOPIC = "input-topic"; |
|
|
|
|
private static final String OUTPUT_TOPIC = "output-topic"; |
|
|
|
|
public static final String GROUP_NAME = "check-group"; |
|
|
|
@ -66,12 +65,20 @@ public class KafkaExactlyOnceDemo {
@@ -66,12 +65,20 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
int numRecords = Integer.parseInt(args[2]); |
|
|
|
|
|
|
|
|
|
// stage 1: clean any topics left from previous runs
|
|
|
|
|
Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC); |
|
|
|
|
Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC); |
|
|
|
|
|
|
|
|
|
// stage 2: send demo records to the input-topic
|
|
|
|
|
CountDownLatch producerLatch = new CountDownLatch(1); |
|
|
|
|
Producer producerThread = new Producer( |
|
|
|
|
"producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch); |
|
|
|
|
"producer", |
|
|
|
|
KafkaProperties.BOOTSTRAP_SERVERS, |
|
|
|
|
INPUT_TOPIC, |
|
|
|
|
false, |
|
|
|
|
null, |
|
|
|
|
true, |
|
|
|
|
numRecords, |
|
|
|
|
-1, |
|
|
|
|
producerLatch); |
|
|
|
|
producerThread.start(); |
|
|
|
|
if (!producerLatch.await(2, TimeUnit.MINUTES)) { |
|
|
|
|
Utils.printErr("Timeout after 2 minutes waiting for data load"); |
|
|
|
@ -83,7 +90,11 @@ public class KafkaExactlyOnceDemo {
@@ -83,7 +90,11 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
CountDownLatch processorsLatch = new CountDownLatch(numInstances); |
|
|
|
|
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances) |
|
|
|
|
.mapToObj(id -> new ExactlyOnceMessageProcessor( |
|
|
|
|
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch)) |
|
|
|
|
"processor-" + id, |
|
|
|
|
KafkaProperties.BOOTSTRAP_SERVERS, |
|
|
|
|
INPUT_TOPIC, |
|
|
|
|
OUTPUT_TOPIC, |
|
|
|
|
processorsLatch)) |
|
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
processors.forEach(ExactlyOnceMessageProcessor::start); |
|
|
|
|
if (!processorsLatch.await(2, TimeUnit.MINUTES)) { |
|
|
|
@ -95,7 +106,14 @@ public class KafkaExactlyOnceDemo {
@@ -95,7 +106,14 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
// stage 4: check consuming records from the output-topic
|
|
|
|
|
CountDownLatch consumerLatch = new CountDownLatch(1); |
|
|
|
|
Consumer consumerThread = new Consumer( |
|
|
|
|
"consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch); |
|
|
|
|
"consumer", |
|
|
|
|
KafkaProperties.BOOTSTRAP_SERVERS, |
|
|
|
|
OUTPUT_TOPIC, |
|
|
|
|
GROUP_NAME, |
|
|
|
|
Optional.empty(), |
|
|
|
|
true, |
|
|
|
|
numRecords, |
|
|
|
|
consumerLatch); |
|
|
|
|
consumerThread.start(); |
|
|
|
|
if (!consumerLatch.await(2, TimeUnit.MINUTES)) { |
|
|
|
|
Utils.printErr("Timeout after 2 minutes waiting for output read"); |
|
|
|
|