Browse Source

KAFKA-6658; Fix RoundTripWorkload and make k/v generation configurable (#4710)

Make PayloadGenerator an interface which can have multiple implementations: constant, uniform random, sequential.

Allow different payload generators to be used for keys and values.

This change fixes RoundTripWorkload.  Previously RoundTripWorkload was unable to get the sequence number of the keys that it produced.
pull/4729/head
Colin Patrick McCabe 7 years ago committed by Jason Gustafson
parent
commit
a70e4f95d7
  1. 54
      tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java
  2. 149
      tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
  3. 55
      tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java
  4. 20
      tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
  5. 16
      tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
  6. 10
      tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
  7. 9
      tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
  8. 65
      tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java
  9. 89
      tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java
  10. 4
      tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
  11. 180
      tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java

54
tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* A PayloadGenerator which always generates a constant payload.
*/
public class ConstantPayloadGenerator implements PayloadGenerator {
private final int size;
private final byte[] value;
@JsonCreator
public ConstantPayloadGenerator(@JsonProperty("size") int size,
@JsonProperty("value") byte[] value) {
this.size = size;
this.value = (value == null || value.length == 0) ? new byte[size] : value;
}
@JsonProperty
public int size() {
return size;
}
@JsonProperty
public byte[] value() {
return value;
}
@Override
public byte[] generate(long position) {
byte[] next = new byte[size];
for (int i = 0; i < next.length; i += value.length) {
System.arraycopy(value, 0, next, i, Math.min(next.length - i, value.length));
}
return next;
}
}

149
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java

@ -14,133 +14,34 @@ @@ -14,133 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.producer.ProducerRecord;
package org.apache.kafka.trogdor.workload;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Describes the payload for the producer record. Currently, it generates constant size values
* and either null keys or constant size key (depending on requested key type). The generator
* is deterministic -- two generator objects created with the same key type, message size, and
* value divergence ratio (see `valueDivergenceRatio` description) will generate the same sequence
* of key/value pairs.
*/
public class PayloadGenerator {
public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3;
public static final int DEFAULT_MESSAGE_SIZE = 512;
/**
* This is the ratio of how much each next value is different from the previous value. This
* is directly related to compression rate we will get. Example: 0.3 divergence ratio gets us
* about 0.3 - 0.45 compression rate with lz4.
*/
private final double valueDivergenceRatio;
private final long baseSeed;
private long currentPosition;
private byte[] baseRecordValue;
private PayloadKeyType recordKeyType;
private Random random;
public PayloadGenerator() {
this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
}
/**
* Generator will generate null keys and values of size `messageSize`
* @param messageSize number of bytes used for key + value
*/
public PayloadGenerator(int messageSize) {
this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
}
/**
* Generator will generate keys of given type and values of size 'messageSize' - (key size).
* If the given key type requires more bytes than messageSize, then the resulting payload
* will be keys of size required for the given key type and 0-length values.
* @param messageSize number of bytes used for key + value
* @param keyType type of keys generated
*/
public PayloadGenerator(int messageSize, PayloadKeyType keyType) {
this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO);
}
/**
* Generator will generate keys of given type and values of size 'messageSize' - (key size).
* If the given key type requires more bytes than messageSize, then the resulting payload
* will be keys of size required for the given key type and 0-length values.
* @param messageSize key + value size
* @param valueDivergenceRatio ratio of how much each next value is different from the previous
* value. Used to approximately control target compression rate (if
* compression is used).
*/
public PayloadGenerator(int messageSize, PayloadKeyType keyType,
double valueDivergenceRatio) {
this.baseSeed = 856; // some random number, may later let pass seed to constructor
this.currentPosition = 0;
this.valueDivergenceRatio = valueDivergenceRatio;
this.random = new Random(this.baseSeed);
final int valueSize = (messageSize > keyType.maxSizeInBytes())
? messageSize - keyType.maxSizeInBytes() : 0;
this.baseRecordValue = new byte[valueSize];
// initialize value with random bytes
for (int i = 0; i < baseRecordValue.length; ++i) {
baseRecordValue[i] = (byte) (random.nextInt(26) + 65);
}
this.recordKeyType = keyType;
}
/**
* Returns current position of the payload generator.
*/
public long position() {
return currentPosition;
}
/**
* Creates record based on the current position, and increments current position.
*/
public ProducerRecord<byte[], byte[]> nextRecord(String topicName) {
return nextRecord(topicName, currentPosition++);
}
/**
* Creates record based on the given position. Does not change the current position.
*/
public ProducerRecord<byte[], byte[]> nextRecord(String topicName, long position) {
byte[] keyBytes = null;
if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) {
keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array();
} else if (recordKeyType != PayloadKeyType.KEY_NULL) {
throw new UnsupportedOperationException(
"PayloadGenerator does not know how to generate key for key type " + recordKeyType);
}
return new ProducerRecord<>(topicName, keyBytes, nextValue(position));
}
@Override
public String toString() {
return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes()
+ ", recordValueSize=" + baseRecordValue.length
+ ", valueDivergenceRatio=" + valueDivergenceRatio + ")";
}
/**
* Returns producer record value
* Generates byte arrays based on a position argument.
*
* The array generated at a given position should be the same no matter how many
* times generate() is invoked. PayloadGenerator instances should be immutable
* and thread-safe.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = "constant"),
@JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = "sequential"),
@JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = "uniformRandom")
})
public interface PayloadGenerator {
/**
* Generate a payload.
*
* @param position The position to use to generate the payload
*
* @return A new array object containing the payload.
*/
private byte[] nextValue(long position) {
// set the seed based on the given position to make sure that the same value is generated
// for the same position.
random.setSeed(baseSeed + 31 * position + 1);
// randomize some of the payload to achieve expected compression rate
byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length);
for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i)
recordValue[i] = (byte) (random.nextInt(26) + 65);
return recordValue;
}
byte[] generate(long position);
}

55
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java

@ -0,0 +1,55 @@ @@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import java.util.Iterator;
/**
* An iterator which wraps a PayloadGenerator.
*/
public final class PayloadIterator implements Iterator<byte[]> {
private final PayloadGenerator generator;
private long position = 0;
public PayloadIterator(PayloadGenerator generator) {
this.generator = generator;
}
@Override
public boolean hasNext() {
return true;
}
@Override
public synchronized byte[] next() {
return generator.generate(position++);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
public synchronized void seek(long position) {
this.position = position;
}
public synchronized long position() {
return this.position;
}
}

20
tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java

@ -37,7 +37,8 @@ public class ProduceBenchSpec extends TaskSpec { @@ -37,7 +37,8 @@ public class ProduceBenchSpec extends TaskSpec {
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final int maxMessages;
private final int messageSize;
private final PayloadGenerator keyGenerator;
private final PayloadGenerator valueGenerator;
private final Map<String, String> producerConf;
private final int totalTopics;
private final int activeTopics;
@ -49,7 +50,8 @@ public class ProduceBenchSpec extends TaskSpec { @@ -49,7 +50,8 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("messageSize") int messageSize,
@JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("totalTopics") int totalTopics,
@JsonProperty("activeTopics") int activeTopics) {
@ -58,7 +60,10 @@ public class ProduceBenchSpec extends TaskSpec { @@ -58,7 +60,10 @@ public class ProduceBenchSpec extends TaskSpec {
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
this.targetMessagesPerSec = targetMessagesPerSec;
this.maxMessages = maxMessages;
this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize;
this.keyGenerator = keyGenerator == null ?
new SequentialPayloadGenerator(4, 0) : keyGenerator;
this.valueGenerator = valueGenerator == null ?
new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator;
this.producerConf = (producerConf == null) ? new TreeMap<String, String>() : producerConf;
this.totalTopics = totalTopics;
this.activeTopics = activeTopics;
@ -85,8 +90,13 @@ public class ProduceBenchSpec extends TaskSpec { @@ -85,8 +90,13 @@ public class ProduceBenchSpec extends TaskSpec {
}
@JsonProperty
public int messageSize() {
return messageSize;
public PayloadGenerator keyGenerator() {
return keyGenerator;
}
@JsonProperty
public PayloadGenerator valueGenerator() {
return valueGenerator;
}
@JsonProperty

16
tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java

@ -91,7 +91,7 @@ public class ProduceBenchWorker implements TaskWorker { @@ -91,7 +91,7 @@ public class ProduceBenchWorker implements TaskWorker {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("ProducerBenchWorker is already running.");
}
log.info("{}: Activating ProduceBenchWorker.", id);
log.info("{}: Activating ProduceBenchWorker with {}", id, spec);
this.executor = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
this.status = status;
@ -172,7 +172,9 @@ public class ProduceBenchWorker implements TaskWorker { @@ -172,7 +172,9 @@ public class ProduceBenchWorker implements TaskWorker {
private final KafkaProducer<byte[], byte[]> producer;
private final PayloadGenerator payloadGenerator;
private final PayloadIterator keys;
private final PayloadIterator values;
private final Throttle throttle;
@ -187,7 +189,8 @@ public class ProduceBenchWorker implements TaskWorker { @@ -187,7 +189,8 @@ public class ProduceBenchWorker implements TaskWorker {
props.setProperty(entry.getKey(), entry.getValue());
}
this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
this.payloadGenerator = new PayloadGenerator(spec.messageSize());
this.keys = new PayloadIterator(spec.keyGenerator());
this.values = new PayloadIterator(spec.valueGenerator());
this.throttle = new SendRecordsThrottle(perPeriod, producer);
}
@ -199,8 +202,10 @@ public class ProduceBenchWorker implements TaskWorker { @@ -199,8 +202,10 @@ public class ProduceBenchWorker implements TaskWorker {
try {
for (int m = 0; m < spec.maxMessages(); m++) {
for (int i = 0; i < spec.activeTopics(); i++) {
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(topicIndexToName(i));
future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
topicIndexToName(i), 0, keys.next(), values.next());
future = producer.send(record,
new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
}
throttle.increment();
}
@ -216,7 +221,6 @@ public class ProduceBenchWorker implements TaskWorker { @@ -216,7 +221,6 @@ public class ProduceBenchWorker implements TaskWorker {
statusUpdaterFuture.cancel(false);
new StatusUpdater(histogram).run();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("Produced {}", payloadGenerator);
log.info("Sent {} total record(s) in {} ms. status: {}",
histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
}

10
tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java

@ -43,6 +43,7 @@ import org.slf4j.Logger; @@ -43,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -69,6 +70,8 @@ public class RoundTripWorker implements TaskWorker { @@ -69,6 +70,8 @@ public class RoundTripWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class);
private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0);
private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker();
private final String id;
@ -183,7 +186,6 @@ public class RoundTripWorker implements TaskWorker { @@ -183,7 +186,6 @@ public class RoundTripWorker implements TaskWorker {
int perPeriod = WorkerUtils.
perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX);
}
@Override
@ -206,7 +208,9 @@ public class RoundTripWorker implements TaskWorker { @@ -206,7 +208,9 @@ public class RoundTripWorker implements TaskWorker {
}
messagesSent++;
// we explicitly specify generator position based on message index
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(TOPIC_NAME, messageIndex);
ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC_NAME, 0,
KEY_GENERATOR.generate(messageIndex),
spec.valueGenerator().generate(messageIndex));
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
@ -286,7 +290,7 @@ public class RoundTripWorker implements TaskWorker { @@ -286,7 +290,7 @@ public class RoundTripWorker implements TaskWorker {
pollInvoked++;
ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME)) {
int messageIndex = ByteBuffer.wrap(record.key()).getInt();
int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
messagesReceived++;
if (toReceiveTracker.removePending(messageIndex)) {
uniqueMessagesReceived++;

9
tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java

@ -39,6 +39,7 @@ public class RoundTripWorkloadSpec extends TaskSpec { @@ -39,6 +39,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final NavigableMap<Integer, List<Integer>> partitionAssignments;
private final PayloadGenerator valueGenerator;
private final int maxMessages;
@JsonCreator
@ -48,6 +49,7 @@ public class RoundTripWorkloadSpec extends TaskSpec { @@ -48,6 +49,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>> partitionAssignments,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("maxMessages") int maxMessages) {
super(startMs, durationMs);
this.clientNode = clientNode == null ? "" : clientNode;
@ -55,6 +57,8 @@ public class RoundTripWorkloadSpec extends TaskSpec { @@ -55,6 +57,8 @@ public class RoundTripWorkloadSpec extends TaskSpec {
this.targetMessagesPerSec = targetMessagesPerSec;
this.partitionAssignments = partitionAssignments == null ?
new TreeMap<Integer, List<Integer>>() : partitionAssignments;
this.valueGenerator = valueGenerator == null ?
new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator;
this.maxMessages = maxMessages;
}
@ -78,6 +82,11 @@ public class RoundTripWorkloadSpec extends TaskSpec { @@ -78,6 +82,11 @@ public class RoundTripWorkloadSpec extends TaskSpec {
return partitionAssignments;
}
@JsonProperty
public PayloadGenerator valueGenerator() {
return valueGenerator;
}
@JsonProperty
public int maxMessages() {
return maxMessages;

65
tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* A PayloadGenerator which generates a sequentially increasing payload.
*
* The generated number will wrap around to 0 after the maximum value is reached.
* Payloads bigger than 8 bytes will always just be padded with zeros after byte 8.
*/
public class SequentialPayloadGenerator implements PayloadGenerator {
private final int size;
private final long startOffset;
private final ByteBuffer buf;
@JsonCreator
public SequentialPayloadGenerator(@JsonProperty("size") int size,
@JsonProperty("offset") long startOffset) {
this.size = size;
this.startOffset = startOffset;
this.buf = ByteBuffer.allocate(8);
// Little-endian byte order allows us to support arbitrary lengths more easily,
// since the first byte is always the lowest-order byte.
this.buf.order(ByteOrder.LITTLE_ENDIAN);
}
@JsonProperty
public int size() {
return size;
}
@JsonProperty
public long startOffset() {
return startOffset;
}
@Override
public synchronized byte[] generate(long position) {
buf.clear();
buf.putLong(position + startOffset);
byte[] result = new byte[size];
System.arraycopy(buf.array(), 0, result, 0, Math.min(buf.array().length, result.length));
return result;
}
}

89
tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java

@ -0,0 +1,89 @@ @@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Random;
/**
* A PayloadGenerator which generates a uniform random payload.
*
* This generator generates pseudo-random payloads that can be reproduced from run to run.
* The guarantees are the same as those of java.util.Random.
*
* This payload generator also has the option to append padding bytes at the end of the payload.
* The padding bytes are always the same, no matter what the position is. This is useful when
* simulating a partly-compressible stream of user data.
*/
public class UniformRandomPayloadGenerator implements PayloadGenerator {
private final int size;
private final long seed;
private final int padding;
private final Random random = new Random();
private final byte[] padBytes;
private final byte[] randomBytes;
@JsonCreator
public UniformRandomPayloadGenerator(@JsonProperty("size") int size,
@JsonProperty("seed") long seed,
@JsonProperty("padding") int padding) {
this.size = size;
this.seed = seed;
this.padding = padding;
if (padding < 0 || padding > size) {
throw new RuntimeException("Invalid value " + padding + " for " +
"padding: the number of padding bytes must not be smaller than " +
"0 or greater than the total payload size.");
}
this.padBytes = new byte[padding];
random.setSeed(seed);
random.nextBytes(padBytes);
this.randomBytes = new byte[size - padding];
}
@JsonProperty
public int size() {
return size;
}
@JsonProperty
public long seed() {
return seed;
}
@JsonProperty
public int padding() {
return padding;
}
@Override
public synchronized byte[] generate(long position) {
byte[] result = new byte[size];
if (randomBytes.length > 0) {
random.setSeed(seed + position);
random.nextBytes(randomBytes);
System.arraycopy(randomBytes, 0, result, 0, Math.min(randomBytes.length, result.length));
}
if (padBytes.length > 0) {
System.arraycopy(padBytes, 0, result, randomBytes.length, result.length - randomBytes.length);
}
return result;
}
}

4
tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java

@ -49,9 +49,9 @@ public class JsonSerializationTest { @@ -49,9 +49,9 @@ public class JsonSerializationTest {
verify(new WorkerRunning(null, 0, null));
verify(new WorkerStopping(null, 0, null));
verify(new ProduceBenchSpec(0, 0, null, null,
0, 0, 0, null, 0, 0));
0, 0, null, null, null, 0, 0));
verify(new RoundTripWorkloadSpec(0, 0, null, null,
0, null, 0));
0, null, null, 0));
verify(new SampleTaskSpec(0, 0, 0, null));
}

180
tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java

@ -17,128 +17,126 @@ @@ -17,128 +17,126 @@
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class PayloadGeneratorTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testGeneratorStartsAtPositionZero() {
PayloadGenerator payloadGenerator = new PayloadGenerator();
assertEquals(0, payloadGenerator.position());
public void testConstantPayloadGenerator() {
byte[] alphabet = new byte[26];
for (int i = 0; i < alphabet.length; i++) {
alphabet[i] = (byte) ('a' + i);
}
@Test
public void testDefaultPayload() {
final long numRecords = 262;
PayloadGenerator payloadGenerator = new PayloadGenerator();
// make sure that each time we produce a different value (except if compression rate is 0)
byte[] prevValue = null;
long expectedPosition = 0;
for (int i = 0; i < numRecords; i++) {
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
assertNull(record.key());
assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length);
assertEquals(++expectedPosition, payloadGenerator.position());
assertFalse("Position " + payloadGenerator.position(),
Arrays.equals(prevValue, record.value()));
prevValue = record.value().clone();
byte[] expectedSuperset = new byte[512];
for (int i = 0; i < expectedSuperset.length; i++) {
expectedSuperset[i] = (byte) ('a' + (i % 26));
}
for (int i : new int[] {1, 5, 10, 100, 511, 512}) {
ConstantPayloadGenerator generator = new ConstantPayloadGenerator(i, alphabet);
assertArrayContains(expectedSuperset, generator.generate(0));
assertArrayContains(expectedSuperset, generator.generate(10));
assertArrayContains(expectedSuperset, generator.generate(100));
}
}
@Test
public void testNullKeyTypeValueSizeIsMessageSize() {
final int size = 200;
PayloadGenerator payloadGenerator = new PayloadGenerator(size);
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
assertNull(record.key());
assertEquals(size, record.value().length);
private static void assertArrayContains(byte[] expectedSuperset, byte[] actual) {
byte[] expected = new byte[actual.length];
System.arraycopy(expectedSuperset, 0, expected, 0, expected.length);
assertArrayEquals(expected, actual);
}
@Test
public void testKeyContainsGeneratorPosition() {
final long numRecords = 10;
final int size = 200;
PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
for (int i = 0; i < numRecords; i++) {
assertEquals(i, generator.position());
ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic");
assertEquals(8, record.key().length);
assertEquals(size - 8, record.value().length);
assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong());
public void testSequentialPayloadGenerator() {
SequentialPayloadGenerator g4 = new SequentialPayloadGenerator(4, 1);
assertLittleEndianArrayEquals(1, g4.generate(0));
assertLittleEndianArrayEquals(2, g4.generate(1));
SequentialPayloadGenerator g8 = new SequentialPayloadGenerator(8, 0);
assertLittleEndianArrayEquals(0, g8.generate(0));
assertLittleEndianArrayEquals(1, g8.generate(1));
assertLittleEndianArrayEquals(123123123123L, g8.generate(123123123123L));
SequentialPayloadGenerator g2 = new SequentialPayloadGenerator(2, 0);
assertLittleEndianArrayEquals(0, g2.generate(0));
assertLittleEndianArrayEquals(1, g2.generate(1));
assertLittleEndianArrayEquals(1, g2.generate(1));
assertLittleEndianArrayEquals(1, g2.generate(131073));
}
private static void assertLittleEndianArrayEquals(long expected, byte[] actual) {
byte[] longActual = new byte[8];
System.arraycopy(actual, 0, longActual, 0, Math.min(actual.length, longActual.length));
ByteBuffer buf = ByteBuffer.wrap(longActual).order(ByteOrder.LITTLE_ENDIAN);
assertEquals(expected, buf.getLong());
}
@Test
public void testGeneratePayloadWithExplicitPosition() {
final int size = 200;
PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
int position = 2;
while (position < 5000000) {
ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic", position);
assertEquals(8, record.key().length);
assertEquals(size - 8, record.value().length);
assertEquals(position, ByteBuffer.wrap(record.key()).getLong());
position = position * 64;
public void testUniformRandomPayloadGenerator() {
PayloadIterator iter = new PayloadIterator(
new UniformRandomPayloadGenerator(1234, 456, 0));
byte[] prev = iter.next();
for (int uniques = 0; uniques < 1000; ) {
byte[] cur = iter.next();
assertEquals(prev.length, cur.length);
if (!Arrays.equals(prev, cur)) {
uniques++;
}
}
public void testSamePositionGeneratesSameKeyAndValue() {
final int size = 100;
PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
ProducerRecord<byte[], byte[]> record1 = generator.nextRecord("test-topic");
assertEquals(1, generator.position());
ProducerRecord<byte[], byte[]> record2 = generator.nextRecord("test-topic");
assertEquals(2, generator.position());
ProducerRecord<byte[], byte[]> record3 = generator.nextRecord("test-topic", 0);
// position should not change if we generated record with specific position
assertEquals(2, generator.position());
assertFalse("Values at different positions should not match.",
Arrays.equals(record1.value(), record2.value()));
assertFalse("Values at different positions should not match.",
Arrays.equals(record3.value(), record2.value()));
assertTrue("Values at the same position should match.",
Arrays.equals(record1.value(), record3.value()));
testReproducible(new UniformRandomPayloadGenerator(1234, 456, 0));
testReproducible(new UniformRandomPayloadGenerator(1, 0, 0));
testReproducible(new UniformRandomPayloadGenerator(10, 6, 5));
testReproducible(new UniformRandomPayloadGenerator(512, 123, 100));
}
@Test
public void testGeneratesDeterministicKeyValues() {
final long numRecords = 194;
final int size = 100;
PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
for (int i = 0; i < numRecords; ++i) {
ProducerRecord<byte[], byte[]> record1 = generator1.nextRecord("test-topic");
ProducerRecord<byte[], byte[]> record2 = generator2.nextRecord("test-topic");
assertTrue(Arrays.equals(record1.value(), record2.value()));
assertTrue(Arrays.equals(record1.key(), record2.key()));
}
private static void testReproducible(PayloadGenerator generator) {
byte[] val = generator.generate(123);
generator.generate(456);
byte[] val2 = generator.generate(123);
assertArrayEquals(val, val2);
}
@Test
public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() {
PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic", 877);
assertEquals(8, record.key().length);
assertEquals(0, record.value().length);
public void testUniformRandomPayloadGeneratorPaddingBytes() {
UniformRandomPayloadGenerator generator =
new UniformRandomPayloadGenerator(1000, 456, 100);
byte[] val1 = generator.generate(0);
byte[] val1End = new byte[100];
System.arraycopy(val1, 900, val1End, 0, 100);
byte[] val2 = generator.generate(100);
byte[] val2End = new byte[100];
System.arraycopy(val2, 900, val2End, 0, 100);
byte[] val3 = generator.generate(200);
byte[] val3End = new byte[100];
System.arraycopy(val3, 900, val3End, 0, 100);
assertArrayEquals(val1End, val2End);
assertArrayEquals(val1End, val3End);
}
@Test
public void testNextRecordGeneratesNewByteArrayForValue() {
PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
ProducerRecord<byte[], byte[]> record1 = payloadGenerator.nextRecord("test-topic", 877);
ProducerRecord<byte[], byte[]> record2 = payloadGenerator.nextRecord("test-topic", 877);
assertNotEquals(record1.value(), record2.value());
public void testPayloadIterator() {
final int expectedSize = 50;
PayloadIterator iter = new PayloadIterator(
new ConstantPayloadGenerator(expectedSize, new byte[0]));
final byte[] expected = new byte[expectedSize];
assertEquals(0, iter.position());
assertArrayEquals(expected, iter.next());
assertEquals(1, iter.position());
assertArrayEquals(expected, iter.next());
assertArrayEquals(expected, iter.next());
assertEquals(3, iter.position());
iter.seek(0);
assertEquals(0, iter.position());
}
}

Loading…
Cancel
Save