Browse Source

Switch to SplittableRandom in ProducerPerformance utility (#13482)

Why:
Using java.util.Random to generate every byte sent from the ProducerPerformance
appears to be a limiting factor. Throughput of the ProducerPerformance script is
higher with a file of records as compared to randomly generated records.

On my machine a single thread can generate ~100MB/second of uppercase letters using
java.util.Random and ~300MB/sec using java.util.SplittableRandom. This is a limit on
throughput.

Note: you can optimise further by expanding it from 26 letters to 32 letter generated
as it is more efficient to generate a nicely distributed int when the bound is a
power of two.

Reviewers: Luke Chen <showuon@gmail.com>
pull/13483/head
Robert Young 2 years ago committed by GitHub
parent
commit
2b26db0d38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
  2. 8
      tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java

7
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java

@ -27,8 +27,8 @@ import java.nio.file.Paths; @@ -27,8 +27,8 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Arrays;
import java.util.SplittableRandom;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import org.apache.kafka.clients.producer.Callback;
@ -92,7 +92,8 @@ public class ProducerPerformance { @@ -92,7 +92,8 @@ public class ProducerPerformance {
if (recordSize != null) {
payload = new byte[recordSize];
}
Random random = new Random(0);
// not threadsafe, do not share with other threads
SplittableRandom random = new SplittableRandom(0);
ProducerRecord<byte[], byte[]> record;
stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
@ -169,7 +170,7 @@ public class ProducerPerformance { @@ -169,7 +170,7 @@ public class ProducerPerformance {
Stats stats;
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
Random random) {
SplittableRandom random) {
if (!payloadByteList.isEmpty()) {
payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
} else if (recordSize != null) {

8
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java

@ -36,7 +36,7 @@ import java.util.ArrayList; @@ -36,7 +36,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.SplittableRandom;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -177,7 +177,7 @@ public class ProducerPerformanceTest { @@ -177,7 +177,7 @@ public class ProducerPerformanceTest {
List<byte[]> payloadByteList = new ArrayList<>();
payloadByteList.add(byteArray);
byte[] payload = null;
Random random = new Random(0);
SplittableRandom random = new SplittableRandom(0);
payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random);
assertEquals(inputString, new String(payload));
@ -188,7 +188,7 @@ public class ProducerPerformanceTest { @@ -188,7 +188,7 @@ public class ProducerPerformanceTest {
Integer recordSize = 100;
byte[] payload = new byte[recordSize];
List<byte[]> payloadByteList = new ArrayList<>();
Random random = new Random(0);
SplittableRandom random = new SplittableRandom(0);
payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random);
for (byte b : payload) {
@ -201,7 +201,7 @@ public class ProducerPerformanceTest { @@ -201,7 +201,7 @@ public class ProducerPerformanceTest {
Integer recordSize = null;
byte[] payload = null;
List<byte[]> payloadByteList = new ArrayList<>();
Random random = new Random(0);
SplittableRandom random = new SplittableRandom(0);
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random));
assertEquals("no payload File Path or record Size provided", thrown.getMessage());

Loading…
Cancel
Save