diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index a4b5345ad41..54954167759 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -28,14 +28,18 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -47,6 +51,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.Properties; +import java.util.Random; public class SimpleBenchmark { @@ -57,14 +62,33 @@ public class SimpleBenchmark { private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic"; private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic"; - private static final long NUM_RECORDS = 10000000L; - private static final Long END_KEY = NUM_RECORDS - 1; + private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1"; + private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2"; + private static final ValueJoiner VALUE_JOINER = new ValueJoiner() { + @Override + public byte[] apply(final byte[] value1, final byte[] value2) { + if (value1 == null && value2 == null) + return new byte[VALUE_SIZE]; + if (value1 == null && value2 != null) + return value2; + if (value1 != null && value2 == null) + return value1; + + byte[] tmp = new byte[value1.length + value2.length]; + System.arraycopy(value1, 0, tmp, 0, value1.length); + System.arraycopy(value2, 0, tmp, value1.length, value2.length); + return tmp; + } + }; + + private static int numRecords; + private static Integer endKey; private static final int KEY_SIZE = 8; private static final int VALUE_SIZE = 100; private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE; private static final Serde BYTE_SERDE = Serdes.ByteArray(); - private static final Serde LONG_SERDE = Serdes.Long(); + private static final Serde INTEGER_SERDE = Serdes.Integer(); public SimpleBenchmark(File stateDir, String kafka, String zookeeper) { super(); @@ -77,6 +101,8 @@ public class SimpleBenchmark { String kafka = args.length > 0 ? args[0] : "localhost:9092"; String zookeeper = args.length > 1 ? args[1] : "localhost:2181"; String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark"; + numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 10000000; + endKey = numRecords - 1; final File stateDir = new File(stateDirStr); stateDir.mkdir(); @@ -88,25 +114,130 @@ public class SimpleBenchmark { System.out.println("kafka=" + kafka); System.out.println("zookeeper=" + zookeeper); System.out.println("stateDir=" + stateDir); + System.out.println("numRecords=" + numRecords); SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper); // producer performance - benchmark.produce(); + benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); // consumer performance - benchmark.consume(); + benchmark.consume(SOURCE_TOPIC); // simple stream performance source->process - benchmark.processStream(); + benchmark.processStream(SOURCE_TOPIC); // simple stream performance source->sink - benchmark.processStreamWithSink(); + benchmark.processStreamWithSink(SOURCE_TOPIC); // simple stream performance source->store - benchmark.processStreamWithStateStore(); + benchmark.processStreamWithStateStore(SOURCE_TOPIC); + // simple streams performance KSTREAM-KTABLE join + benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "kStreamKTable", JOIN_TOPIC_2_PREFIX + "kStreamKTable"); + // simple streams performance KSTREAM-KSTREAM join + benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "kStreamKStream", JOIN_TOPIC_2_PREFIX + "kStreamKStream"); + // simple streams performance KTABLE-KTABLE join + benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "kTableKTable", JOIN_TOPIC_2_PREFIX + "kTableKTable"); + } + + private Properties setJoinProperties(final String applicationId) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + return props; + } + + /** + * Measure the performance of a KStream-KTable left join. The setup is such that each + * KStream record joins to exactly one element in the KTable + */ + public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception { + CountDownLatch latch = new CountDownLatch(numRecords); + + // initialize topics + System.out.println("Initializing kStreamTopic " + kStreamTopic); + produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false); + System.out.println("Initializing kTableTopic " + kTableTopic); + produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false); + + // setup join + Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join"); + final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch); + + // run benchmark + runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch); + } + + /** + * Measure the performance of a KStream-KStream left join. The setup is such that each + * KStream record joins to exactly one element in the other KStream + */ + public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception { + CountDownLatch latch = new CountDownLatch(numRecords); + + // initialize topics + System.out.println("Initializing kStreamTopic " + kStreamTopic1); + produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false); + System.out.println("Initializing kStreamTopic " + kStreamTopic2); + produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false); + + // setup join + Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join"); + final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch); + + // run benchmark + runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch); + } + + /** + * Measure the performance of a KTable-KTable left join. The setup is such that each + * KTable record joins to exactly one element in the other KTable + */ + public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception { + CountDownLatch latch = new CountDownLatch(numRecords); + + // initialize topics + System.out.println("Initializing kTableTopic " + kTableTopic1); + produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false); + System.out.println("Initializing kTableTopic " + kTableTopic2); + produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false); + + // setup join + Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join"); + final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch); + + // run benchmark + runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch); + } + + private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) { + streams.start(); + + long startTime = System.currentTimeMillis(); + + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException ex) { + //ignore + } + } + long endTime = System.currentTimeMillis(); + + + System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + VALUE_SIZE)); + + streams.close(); } - public void processStream() { + + + public void processStream(String topic) { CountDownLatch latch = new CountDownLatch(1); - final KafkaStreams streams = createKafkaStreams(stateDir, kafka, zookeeper, latch); + final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, zookeeper, latch); Thread thread = new Thread() { public void run() { @@ -137,10 +268,10 @@ public class SimpleBenchmark { } } - public void processStreamWithSink() { + public void processStreamWithSink(String topic) { CountDownLatch latch = new CountDownLatch(1); - final KafkaStreams streams = createKafkaStreamsWithSink(stateDir, kafka, zookeeper, latch); + final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, zookeeper, latch); Thread thread = new Thread() { public void run() { @@ -171,10 +302,10 @@ public class SimpleBenchmark { } } - public void processStreamWithStateStore() { + public void processStreamWithStateStore(String topic) { CountDownLatch latch = new CountDownLatch(1); - final KafkaStreams streams = createKafkaStreamsWithStateStore(stateDir, kafka, zookeeper, latch); + final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, zookeeper, latch); Thread thread = new Thread() { public void run() { @@ -205,54 +336,76 @@ public class SimpleBenchmark { } } - public void produce() { + /** + * Produce values to a topic + * @param topic Topic to produce to + * @param valueSizeBytes Size of value in bytes + * @param clientId String specifying client ID + * @param numRecords Number of records to produce + * @param sequential if True, then keys are produced sequentially from 0 to upperRange. In this case upperRange must be >= numRecords. + * if False, then keys are produced randomly in range [0, upperRange) + * @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used + * when this produce step is part of another benchmark that produces its own stats + */ + public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, + int upperRange, boolean printStats) throws Exception { + + if (sequential) { + if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords"); + } Properties props = new Properties(); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "simple-benchmark-produce"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + int key = 0; + Random rand = new Random(); + KafkaProducer producer = new KafkaProducer<>(props); - KafkaProducer producer = new KafkaProducer<>(props); - - byte[] value = new byte[VALUE_SIZE]; + byte[] value = new byte[valueSizeBytes]; long startTime = System.currentTimeMillis(); - for (int i = 0; i < NUM_RECORDS; i++) { - producer.send(new ProducerRecord<>(SOURCE_TOPIC, (long) i, value)); + if (sequential) key = 0; + else key = rand.nextInt(upperRange); + for (int i = 0; i < numRecords; i++) { + producer.send(new ProducerRecord<>(topic, key, value)); + if (sequential) key++; + else key = rand.nextInt(upperRange); } producer.close(); long endTime = System.currentTimeMillis(); - System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime)); + if (printStats) + System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes)); } - public void consume() { + public void consume(String topic) { Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - KafkaConsumer consumer = new KafkaConsumer<>(props); + KafkaConsumer consumer = new KafkaConsumer<>(props); - List partitions = getAllPartitions(consumer, SOURCE_TOPIC); + List partitions = getAllPartitions(consumer, topic); consumer.assign(partitions); consumer.seekToBeginning(partitions); - Long key = null; + Integer key = null; long startTime = System.currentTimeMillis(); while (true) { - ConsumerRecords records = consumer.poll(500); + ConsumerRecords records = consumer.poll(500); if (records.isEmpty()) { - if (END_KEY.equals(key)) + if (endKey.equals(key)) break; } else { - for (ConsumerRecord record : records) { - Long recKey = record.key(); + for (ConsumerRecord record : records) { + Integer recKey = record.key(); if (key == null || key < recKey) key = recKey; @@ -266,7 +419,7 @@ public class SimpleBenchmark { System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime)); } - private KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { + private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams"); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); @@ -277,20 +430,20 @@ public class SimpleBenchmark { KStreamBuilder builder = new KStreamBuilder(); - KStream source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC); + KStream source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic); - source.process(new ProcessorSupplier() { + source.process(new ProcessorSupplier() { @Override - public Processor get() { - return new Processor() { + public Processor get() { + return new Processor() { @Override public void init(ProcessorContext context) { } @Override - public void process(Long key, byte[] value) { - if (END_KEY.equals(key)) { + public void process(Integer key, byte[] value) { + if (endKey.equals(key)) { latch.countDown(); } } @@ -309,7 +462,7 @@ public class SimpleBenchmark { return new KafkaStreams(builder, props); } - private KafkaStreams createKafkaStreamsWithSink(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { + private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink"); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); @@ -320,21 +473,21 @@ public class SimpleBenchmark { KStreamBuilder builder = new KStreamBuilder(); - KStream source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC); + KStream source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic); - source.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC); - source.process(new ProcessorSupplier() { + source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC); + source.process(new ProcessorSupplier() { @Override - public Processor get() { - return new Processor() { + public Processor get() { + return new Processor() { @Override public void init(ProcessorContext context) { } @Override - public void process(Long key, byte[] value) { - if (END_KEY.equals(key)) { + public void process(Integer key, byte[] value) { + if (endKey.equals(key)) { latch.countDown(); } } @@ -353,8 +506,56 @@ public class SimpleBenchmark { return new KafkaStreams(builder, props); } + private class CountDownAction implements ForeachAction { + private CountDownLatch latch; + CountDownAction(final CountDownLatch latch) { + this.latch = latch; + } + @Override + public void apply(K key, V value) { + this.latch.countDown(); + } + } + + private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic, + String kTableTopic, final CountDownLatch latch) { + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream input1 = builder.stream(kStreamTopic); + final KTable input2 = builder.table(kTableTopic, kTableTopic + "-store"); + + input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch)); + + return new KafkaStreams(builder, streamConfig); + } + + private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1, + String kTableTopic2, final CountDownLatch latch) { + final KStreamBuilder builder = new KStreamBuilder(); + + final KTable input1 = builder.table(kTableTopic1, kTableTopic1 + "-store"); + final KTable input2 = builder.table(kTableTopic2, kTableTopic2 + "-store"); + + input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch)); + + return new KafkaStreams(builder, streamConfig); + } + + private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1, + String kStreamTopic2, final CountDownLatch latch) { + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream input1 = builder.stream(kStreamTopic1); + final KStream input2 = builder.stream(kStreamTopic2); + final long timeDifferenceMs = 10000L; - private KafkaStreams createKafkaStreamsWithStateStore(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { + input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch)); + + return new KafkaStreams(builder, streamConfig); + } + + private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka, String zookeeper, + final CountDownLatch latch) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store"); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); @@ -365,28 +566,28 @@ public class SimpleBenchmark { KStreamBuilder builder = new KStreamBuilder(); - builder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build()); + builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build()); - KStream source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC); + KStream source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic); - source.process(new ProcessorSupplier() { + source.process(new ProcessorSupplier() { @Override - public Processor get() { - return new Processor() { + public Processor get() { + return new Processor() { - KeyValueStore store; + KeyValueStore store; @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { - store = (KeyValueStore) context.getStateStore("store"); + store = (KeyValueStore) context.getStateStore("store"); } @Override - public void process(Long key, byte[] value) { + public void process(Integer key, byte[] value) { store.put(key, value); - if (END_KEY.equals(key)) { + if (endKey.equals(key)) { latch.countDown(); } } @@ -406,7 +607,11 @@ public class SimpleBenchmark { } private double megaBytePerSec(long time) { - return (double) (RECORD_SIZE * NUM_RECORDS / 1024 / 1024) / ((double) time / 1000); + return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000); + } + + private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) { + return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000); } private List getAllPartitions(KafkaConsumer consumer, String... topics) { diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index 5eb26639782..de687e69b6f 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -27,7 +27,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest): def __init__(self, test_context): super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1) - self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka) + self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L) def test_simple_benchmark(self): """ diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index 289bccbefc0..b7d6b892a43 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -47,9 +47,10 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): "collect_default": True}, } - def __init__(self, context, kafka): + def __init__(self, context, kafka, numrecs): super(StreamsSimpleBenchmarkService, self).__init__(context, 1) self.kafka = kafka + self.numrecs = numrecs @property def node(self): @@ -88,6 +89,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): args['kafka'] = self.kafka.bootstrap_servers() args['zk'] = self.kafka.zk.connect_setting() args['state_dir'] = self.PERSISTENT_ROOT + args['numrecs'] = self.numrecs args['stdout'] = self.STDOUT_FILE args['stderr'] = self.STDERR_FILE args['pidfile'] = self.PID_FILE @@ -96,7 +98,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark " \ - " %(kafka)s %(zk)s %(state_dir)s " \ + " %(kafka)s %(zk)s %(state_dir)s %(numrecs)s " \ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args return cmd