Browse Source

KAFKA-4016: Added join benchmarks

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ismael Juma, Damian Guy, Guozhang Wang

Closes #1700 from enothereska/join-benchmarks
pull/1700/merge
Eno Thereska 8 years ago committed by Guozhang Wang
parent
commit
c5d26c4829
  1. 325
      streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
  2. 2
      tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
  3. 6
      tests/kafkatest/services/performance/streams_performance.py

325
streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java

@ -28,14 +28,18 @@ import org.apache.kafka.common.PartitionInfo; @@ -28,14 +28,18 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -47,6 +51,7 @@ import java.util.ArrayList; @@ -47,6 +51,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Properties;
import java.util.Random;
public class SimpleBenchmark {
@ -57,14 +62,33 @@ public class SimpleBenchmark { @@ -57,14 +62,33 @@ public class SimpleBenchmark {
private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
private static final long NUM_RECORDS = 10000000L;
private static final Long END_KEY = NUM_RECORDS - 1;
private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
@Override
public byte[] apply(final byte[] value1, final byte[] value2) {
if (value1 == null && value2 == null)
return new byte[VALUE_SIZE];
if (value1 == null && value2 != null)
return value2;
if (value1 != null && value2 == null)
return value1;
byte[] tmp = new byte[value1.length + value2.length];
System.arraycopy(value1, 0, tmp, 0, value1.length);
System.arraycopy(value2, 0, tmp, value1.length, value2.length);
return tmp;
}
};
private static int numRecords;
private static Integer endKey;
private static final int KEY_SIZE = 8;
private static final int VALUE_SIZE = 100;
private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
private static final Serde<Long> LONG_SERDE = Serdes.Long();
private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
super();
@ -77,6 +101,8 @@ public class SimpleBenchmark { @@ -77,6 +101,8 @@ public class SimpleBenchmark {
String kafka = args.length > 0 ? args[0] : "localhost:9092";
String zookeeper = args.length > 1 ? args[1] : "localhost:2181";
String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark";
numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 10000000;
endKey = numRecords - 1;
final File stateDir = new File(stateDirStr);
stateDir.mkdir();
@ -88,25 +114,130 @@ public class SimpleBenchmark { @@ -88,25 +114,130 @@ public class SimpleBenchmark {
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("stateDir=" + stateDir);
System.out.println("numRecords=" + numRecords);
SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
// producer performance
benchmark.produce();
benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
// consumer performance
benchmark.consume();
benchmark.consume(SOURCE_TOPIC);
// simple stream performance source->process
benchmark.processStream();
benchmark.processStream(SOURCE_TOPIC);
// simple stream performance source->sink
benchmark.processStreamWithSink();
benchmark.processStreamWithSink(SOURCE_TOPIC);
// simple stream performance source->store
benchmark.processStreamWithStateStore();
benchmark.processStreamWithStateStore(SOURCE_TOPIC);
// simple streams performance KSTREAM-KTABLE join
benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "kStreamKTable", JOIN_TOPIC_2_PREFIX + "kStreamKTable");
// simple streams performance KSTREAM-KSTREAM join
benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "kStreamKStream", JOIN_TOPIC_2_PREFIX + "kStreamKStream");
// simple streams performance KTABLE-KTABLE join
benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "kTableKTable", JOIN_TOPIC_2_PREFIX + "kTableKTable");
}
private Properties setJoinProperties(final String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
return props;
}
/**
* Measure the performance of a KStream-KTable left join. The setup is such that each
* KStream record joins to exactly one element in the KTable
*/
public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
CountDownLatch latch = new CountDownLatch(numRecords);
// initialize topics
System.out.println("Initializing kStreamTopic " + kStreamTopic);
produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false);
System.out.println("Initializing kTableTopic " + kTableTopic);
produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false);
// setup join
Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join");
final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
// run benchmark
runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch);
}
/**
* Measure the performance of a KStream-KStream left join. The setup is such that each
* KStream record joins to exactly one element in the other KStream
*/
public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
CountDownLatch latch = new CountDownLatch(numRecords);
// initialize topics
System.out.println("Initializing kStreamTopic " + kStreamTopic1);
produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false);
System.out.println("Initializing kStreamTopic " + kStreamTopic2);
produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false);
// setup join
Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join");
final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
// run benchmark
runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch);
}
/**
* Measure the performance of a KTable-KTable left join. The setup is such that each
* KTable record joins to exactly one element in the other KTable
*/
public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
CountDownLatch latch = new CountDownLatch(numRecords);
// initialize topics
System.out.println("Initializing kTableTopic " + kTableTopic1);
produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false);
System.out.println("Initializing kTableTopic " + kTableTopic2);
produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false);
// setup join
Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join");
final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
// run benchmark
runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch);
}
private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
streams.start();
long startTime = System.currentTimeMillis();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ex) {
//ignore
}
}
long endTime = System.currentTimeMillis();
System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + VALUE_SIZE));
streams.close();
}
public void processStream() {
public void processStream(String topic) {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreams(stateDir, kafka, zookeeper, latch);
final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, zookeeper, latch);
Thread thread = new Thread() {
public void run() {
@ -137,10 +268,10 @@ public class SimpleBenchmark { @@ -137,10 +268,10 @@ public class SimpleBenchmark {
}
}
public void processStreamWithSink() {
public void processStreamWithSink(String topic) {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithSink(stateDir, kafka, zookeeper, latch);
final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, zookeeper, latch);
Thread thread = new Thread() {
public void run() {
@ -171,10 +302,10 @@ public class SimpleBenchmark { @@ -171,10 +302,10 @@ public class SimpleBenchmark {
}
}
public void processStreamWithStateStore() {
public void processStreamWithStateStore(String topic) {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithStateStore(stateDir, kafka, zookeeper, latch);
final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, zookeeper, latch);
Thread thread = new Thread() {
public void run() {
@ -205,54 +336,76 @@ public class SimpleBenchmark { @@ -205,54 +336,76 @@ public class SimpleBenchmark {
}
}
public void produce() {
/**
* Produce values to a topic
* @param topic Topic to produce to
* @param valueSizeBytes Size of value in bytes
* @param clientId String specifying client ID
* @param numRecords Number of records to produce
* @param sequential if True, then keys are produced sequentially from 0 to upperRange. In this case upperRange must be >= numRecords.
* if False, then keys are produced randomly in range [0, upperRange)
* @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used
* when this produce step is part of another benchmark that produces its own stats
*/
public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
int upperRange, boolean printStats) throws Exception {
if (sequential) {
if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
}
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "simple-benchmark-produce");
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
int key = 0;
Random rand = new Random();
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
KafkaProducer<Long, byte[]> producer = new KafkaProducer<>(props);
byte[] value = new byte[VALUE_SIZE];
byte[] value = new byte[valueSizeBytes];
long startTime = System.currentTimeMillis();
for (int i = 0; i < NUM_RECORDS; i++) {
producer.send(new ProducerRecord<>(SOURCE_TOPIC, (long) i, value));
if (sequential) key = 0;
else key = rand.nextInt(upperRange);
for (int i = 0; i < numRecords; i++) {
producer.send(new ProducerRecord<>(topic, key, value));
if (sequential) key++;
else key = rand.nextInt(upperRange);
}
producer.close();
long endTime = System.currentTimeMillis();
System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime));
if (printStats)
System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes));
}
public void consume() {
public void consume(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = getAllPartitions(consumer, SOURCE_TOPIC);
List<TopicPartition> partitions = getAllPartitions(consumer, topic);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
Long key = null;
Integer key = null;
long startTime = System.currentTimeMillis();
while (true) {
ConsumerRecords<Long, byte[]> records = consumer.poll(500);
ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
if (records.isEmpty()) {
if (END_KEY.equals(key))
if (endKey.equals(key))
break;
} else {
for (ConsumerRecord<Long, byte[]> record : records) {
Long recKey = record.key();
for (ConsumerRecord<Integer, byte[]> record : records) {
Integer recKey = record.key();
if (key == null || key < recKey)
key = recKey;
@ -266,7 +419,7 @@ public class SimpleBenchmark { @@ -266,7 +419,7 @@ public class SimpleBenchmark {
System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
}
private KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@ -277,20 +430,20 @@ public class SimpleBenchmark { @@ -277,20 +430,20 @@ public class SimpleBenchmark {
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
source.process(new ProcessorSupplier<Long, byte[]>() {
source.process(new ProcessorSupplier<Integer, byte[]>() {
@Override
public Processor<Long, byte[]> get() {
return new Processor<Long, byte[]>() {
public Processor<Integer, byte[]> get() {
return new Processor<Integer, byte[]>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void process(Long key, byte[] value) {
if (END_KEY.equals(key)) {
public void process(Integer key, byte[] value) {
if (endKey.equals(key)) {
latch.countDown();
}
}
@ -309,7 +462,7 @@ public class SimpleBenchmark { @@ -309,7 +462,7 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, props);
}
private KafkaStreams createKafkaStreamsWithSink(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@ -320,21 +473,21 @@ public class SimpleBenchmark { @@ -320,21 +473,21 @@ public class SimpleBenchmark {
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
source.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC);
source.process(new ProcessorSupplier<Long, byte[]>() {
source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
source.process(new ProcessorSupplier<Integer, byte[]>() {
@Override
public Processor<Long, byte[]> get() {
return new Processor<Long, byte[]>() {
public Processor<Integer, byte[]> get() {
return new Processor<Integer, byte[]>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void process(Long key, byte[] value) {
if (END_KEY.equals(key)) {
public void process(Integer key, byte[] value) {
if (endKey.equals(key)) {
latch.countDown();
}
}
@ -353,8 +506,56 @@ public class SimpleBenchmark { @@ -353,8 +506,56 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, props);
}
private class CountDownAction<K, V> implements ForeachAction<K, V> {
private CountDownLatch latch;
CountDownAction(final CountDownLatch latch) {
this.latch = latch;
}
@Override
public void apply(K key, V value) {
this.latch.countDown();
}
}
private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic,
String kTableTopic, final CountDownLatch latch) {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store");
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
return new KafkaStreams(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
String kTableTopic2, final CountDownLatch latch) {
final KStreamBuilder builder = new KStreamBuilder();
final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
return new KafkaStreams(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
String kStreamTopic2, final CountDownLatch latch) {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic1);
final KStream<Long, byte[]> input2 = builder.stream(kStreamTopic2);
final long timeDifferenceMs = 10000L;
private KafkaStreams createKafkaStreamsWithStateStore(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
return new KafkaStreams(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka, String zookeeper,
final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@ -365,28 +566,28 @@ public class SimpleBenchmark { @@ -365,28 +566,28 @@ public class SimpleBenchmark {
KStreamBuilder builder = new KStreamBuilder();
builder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build());
builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
source.process(new ProcessorSupplier<Long, byte[]>() {
source.process(new ProcessorSupplier<Integer, byte[]>() {
@Override
public Processor<Long, byte[]> get() {
return new Processor<Long, byte[]>() {
public Processor<Integer, byte[]> get() {
return new Processor<Integer, byte[]>() {
KeyValueStore<Long, byte[]> store;
KeyValueStore<Integer, byte[]> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, byte[]>) context.getStateStore("store");
store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
}
@Override
public void process(Long key, byte[] value) {
public void process(Integer key, byte[] value) {
store.put(key, value);
if (END_KEY.equals(key)) {
if (endKey.equals(key)) {
latch.countDown();
}
}
@ -406,7 +607,11 @@ public class SimpleBenchmark { @@ -406,7 +607,11 @@ public class SimpleBenchmark {
}
private double megaBytePerSec(long time) {
return (double) (RECORD_SIZE * NUM_RECORDS / 1024 / 1024) / ((double) time / 1000);
return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000);
}
private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) {
return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000);
}
private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {

2
tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py

@ -27,7 +27,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest): @@ -27,7 +27,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest):
def __init__(self, test_context):
super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1)
self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka)
self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
def test_simple_benchmark(self):
"""

6
tests/kafkatest/services/performance/streams_performance.py

@ -47,9 +47,10 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): @@ -47,9 +47,10 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
"collect_default": True},
}
def __init__(self, context, kafka):
def __init__(self, context, kafka, numrecs):
super(StreamsSimpleBenchmarkService, self).__init__(context, 1)
self.kafka = kafka
self.numrecs = numrecs
@property
def node(self):
@ -88,6 +89,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): @@ -88,6 +89,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
args['kafka'] = self.kafka.bootstrap_servers()
args['zk'] = self.kafka.zk.connect_setting()
args['state_dir'] = self.PERSISTENT_ROOT
args['numrecs'] = self.numrecs
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
@ -96,7 +98,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): @@ -96,7 +98,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark " \
" %(kafka)s %(zk)s %(state_dir)s " \
" %(kafka)s %(zk)s %(state_dir)s %(numrecs)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd

Loading…
Cancel
Save