|
|
|
@ -19,6 +19,7 @@ package kafka.examples;
@@ -19,6 +19,7 @@ package kafka.examples;
|
|
|
|
|
import org.apache.kafka.clients.admin.Admin; |
|
|
|
|
import org.apache.kafka.clients.admin.NewTopic; |
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig; |
|
|
|
|
import org.apache.kafka.common.errors.TimeoutException; |
|
|
|
|
import org.apache.kafka.common.errors.TopicExistsException; |
|
|
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; |
|
|
|
|
|
|
|
|
@ -77,9 +78,9 @@ public class KafkaExactlyOnceDemo {
@@ -77,9 +78,9 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
String mode = args[0]; |
|
|
|
|
int numPartitions = Integer.valueOf(args[1]); |
|
|
|
|
int numInstances = Integer.valueOf(args[2]); |
|
|
|
|
int numRecords = Integer.valueOf(args[3]); |
|
|
|
|
int numPartitions = Integer.parseInt(args[1]); |
|
|
|
|
int numInstances = Integer.parseInt(args[2]); |
|
|
|
|
int numRecords = Integer.parseInt(args[3]); |
|
|
|
|
|
|
|
|
|
/* Stage 1: topic cleanup and recreation */ |
|
|
|
|
recreateTopics(numPartitions); |
|
|
|
@ -90,7 +91,9 @@ public class KafkaExactlyOnceDemo {
@@ -90,7 +91,9 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, prePopulateLatch); |
|
|
|
|
producerThread.start(); |
|
|
|
|
|
|
|
|
|
prePopulateLatch.await(5, TimeUnit.MINUTES); |
|
|
|
|
if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) { |
|
|
|
|
throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); |
|
|
|
|
|
|
|
|
@ -102,7 +105,9 @@ public class KafkaExactlyOnceDemo {
@@ -102,7 +105,9 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
messageProcessor.start(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
transactionalCopyLatch.await(5, TimeUnit.MINUTES); |
|
|
|
|
if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) { |
|
|
|
|
throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
CountDownLatch consumeLatch = new CountDownLatch(1); |
|
|
|
|
|
|
|
|
@ -110,7 +115,10 @@ public class KafkaExactlyOnceDemo {
@@ -110,7 +115,10 @@ public class KafkaExactlyOnceDemo {
|
|
|
|
|
Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch); |
|
|
|
|
consumerThread.start(); |
|
|
|
|
|
|
|
|
|
consumeLatch.await(5, TimeUnit.MINUTES); |
|
|
|
|
if (!consumeLatch.await(5, TimeUnit.MINUTES)) { |
|
|
|
|
throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
consumerThread.shutdown(); |
|
|
|
|
System.out.println("All finished!"); |
|
|
|
|
} |
|
|
|
|