diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java new file mode 100644 index 00000000000..d0c1c4862de --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A PayloadGenerator which always generates a constant payload. + */ +public class ConstantPayloadGenerator implements PayloadGenerator { + private final int size; + private final byte[] value; + + @JsonCreator + public ConstantPayloadGenerator(@JsonProperty("size") int size, + @JsonProperty("value") byte[] value) { + this.size = size; + this.value = (value == null || value.length == 0) ? new byte[size] : value; + } + + @JsonProperty + public int size() { + return size; + } + + @JsonProperty + public byte[] value() { + return value; + } + + @Override + public byte[] generate(long position) { + byte[] next = new byte[size]; + for (int i = 0; i < next.length; i += value.length) { + System.arraycopy(value, 0, next, i, Math.min(next.length - i, value.length)); + } + return next; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java index 9acd5fad2d7..4895f217672 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java @@ -14,133 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.trogdor.workload; -import org.apache.kafka.clients.producer.ProducerRecord; +package org.apache.kafka.trogdor.workload; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Random; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; /** - * Describes the payload for the producer record. Currently, it generates constant size values - * and either null keys or constant size key (depending on requested key type). The generator - * is deterministic -- two generator objects created with the same key type, message size, and - * value divergence ratio (see `valueDivergenceRatio` description) will generate the same sequence - * of key/value pairs. + * Generates byte arrays based on a position argument. + * + * The array generated at a given position should be the same no matter how many + * times generate() is invoked. PayloadGenerator instances should be immutable + * and thread-safe. */ -public class PayloadGenerator { - - public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3; - public static final int DEFAULT_MESSAGE_SIZE = 512; - - /** - * This is the ratio of how much each next value is different from the previous value. This - * is directly related to compression rate we will get. Example: 0.3 divergence ratio gets us - * about 0.3 - 0.45 compression rate with lz4. - */ - private final double valueDivergenceRatio; - private final long baseSeed; - private long currentPosition; - private byte[] baseRecordValue; - private PayloadKeyType recordKeyType; - private Random random; - - public PayloadGenerator() { - this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO); - } - - /** - * Generator will generate null keys and values of size `messageSize` - * @param messageSize number of bytes used for key + value - */ - public PayloadGenerator(int messageSize) { - this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO); - } - - /** - * Generator will generate keys of given type and values of size 'messageSize' - (key size). - * If the given key type requires more bytes than messageSize, then the resulting payload - * will be keys of size required for the given key type and 0-length values. - * @param messageSize number of bytes used for key + value - * @param keyType type of keys generated - */ - public PayloadGenerator(int messageSize, PayloadKeyType keyType) { - this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO); - } - - /** - * Generator will generate keys of given type and values of size 'messageSize' - (key size). - * If the given key type requires more bytes than messageSize, then the resulting payload - * will be keys of size required for the given key type and 0-length values. - * @param messageSize key + value size - * @param valueDivergenceRatio ratio of how much each next value is different from the previous - * value. Used to approximately control target compression rate (if - * compression is used). - */ - public PayloadGenerator(int messageSize, PayloadKeyType keyType, - double valueDivergenceRatio) { - this.baseSeed = 856; // some random number, may later let pass seed to constructor - this.currentPosition = 0; - this.valueDivergenceRatio = valueDivergenceRatio; - this.random = new Random(this.baseSeed); - - final int valueSize = (messageSize > keyType.maxSizeInBytes()) - ? messageSize - keyType.maxSizeInBytes() : 0; - this.baseRecordValue = new byte[valueSize]; - // initialize value with random bytes - for (int i = 0; i < baseRecordValue.length; ++i) { - baseRecordValue[i] = (byte) (random.nextInt(26) + 65); - } - this.recordKeyType = keyType; - } - - /** - * Returns current position of the payload generator. - */ - public long position() { - return currentPosition; - } - - /** - * Creates record based on the current position, and increments current position. - */ - public ProducerRecord nextRecord(String topicName) { - return nextRecord(topicName, currentPosition++); - } - - /** - * Creates record based on the given position. Does not change the current position. - */ - public ProducerRecord nextRecord(String topicName, long position) { - byte[] keyBytes = null; - if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) { - keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array(); - } else if (recordKeyType != PayloadKeyType.KEY_NULL) { - throw new UnsupportedOperationException( - "PayloadGenerator does not know how to generate key for key type " + recordKeyType); - } - return new ProducerRecord<>(topicName, keyBytes, nextValue(position)); - } - - @Override - public String toString() { - return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes() - + ", recordValueSize=" + baseRecordValue.length - + ", valueDivergenceRatio=" + valueDivergenceRatio + ")"; - } - - /** - * Returns producer record value - */ - private byte[] nextValue(long position) { - // set the seed based on the given position to make sure that the same value is generated - // for the same position. - random.setSeed(baseSeed + 31 * position + 1); - // randomize some of the payload to achieve expected compression rate - byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length); - for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i) - recordValue[i] = (byte) (random.nextInt(26) + 65); - return recordValue; - } +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = "constant"), + @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = "sequential"), + @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = "uniformRandom") + }) +public interface PayloadGenerator { + /** + * Generate a payload. + * + * @param position The position to use to generate the payload + * + * @return A new array object containing the payload. + */ + byte[] generate(long position); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java new file mode 100644 index 00000000000..a5f3baebcfc --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import java.util.Iterator; + +/** + * An iterator which wraps a PayloadGenerator. + */ +public final class PayloadIterator implements Iterator { + private final PayloadGenerator generator; + private long position = 0; + + public PayloadIterator(PayloadGenerator generator) { + this.generator = generator; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public synchronized byte[] next() { + return generator.generate(position++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + public synchronized void seek(long position) { + this.position = position; + } + + public synchronized long position() { + return this.position; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index 3e05a53b764..a798e73a754 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -37,7 +37,8 @@ public class ProduceBenchSpec extends TaskSpec { private final String bootstrapServers; private final int targetMessagesPerSec; private final int maxMessages; - private final int messageSize; + private final PayloadGenerator keyGenerator; + private final PayloadGenerator valueGenerator; private final Map producerConf; private final int totalTopics; private final int activeTopics; @@ -49,7 +50,8 @@ public class ProduceBenchSpec extends TaskSpec { @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, @JsonProperty("maxMessages") int maxMessages, - @JsonProperty("messageSize") int messageSize, + @JsonProperty("keyGenerator") PayloadGenerator keyGenerator, + @JsonProperty("valueGenerator") PayloadGenerator valueGenerator, @JsonProperty("producerConf") Map producerConf, @JsonProperty("totalTopics") int totalTopics, @JsonProperty("activeTopics") int activeTopics) { @@ -58,7 +60,10 @@ public class ProduceBenchSpec extends TaskSpec { this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; this.targetMessagesPerSec = targetMessagesPerSec; this.maxMessages = maxMessages; - this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize; + this.keyGenerator = keyGenerator == null ? + new SequentialPayloadGenerator(4, 0) : keyGenerator; + this.valueGenerator = valueGenerator == null ? + new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator; this.producerConf = (producerConf == null) ? new TreeMap() : producerConf; this.totalTopics = totalTopics; this.activeTopics = activeTopics; @@ -85,8 +90,13 @@ public class ProduceBenchSpec extends TaskSpec { } @JsonProperty - public int messageSize() { - return messageSize; + public PayloadGenerator keyGenerator() { + return keyGenerator; + } + + @JsonProperty + public PayloadGenerator valueGenerator() { + return valueGenerator; } @JsonProperty diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 1bd386d1e3f..51f52d30aae 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -91,7 +91,7 @@ public class ProduceBenchWorker implements TaskWorker { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("ProducerBenchWorker is already running."); } - log.info("{}: Activating ProduceBenchWorker.", id); + log.info("{}: Activating ProduceBenchWorker with {}", id, spec); this.executor = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false)); this.status = status; @@ -172,7 +172,9 @@ public class ProduceBenchWorker implements TaskWorker { private final KafkaProducer producer; - private final PayloadGenerator payloadGenerator; + private final PayloadIterator keys; + + private final PayloadIterator values; private final Throttle throttle; @@ -187,7 +189,8 @@ public class ProduceBenchWorker implements TaskWorker { props.setProperty(entry.getKey(), entry.getValue()); } this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); - this.payloadGenerator = new PayloadGenerator(spec.messageSize()); + this.keys = new PayloadIterator(spec.keyGenerator()); + this.values = new PayloadIterator(spec.valueGenerator()); this.throttle = new SendRecordsThrottle(perPeriod, producer); } @@ -199,8 +202,10 @@ public class ProduceBenchWorker implements TaskWorker { try { for (int m = 0; m < spec.maxMessages(); m++) { for (int i = 0; i < spec.activeTopics(); i++) { - ProducerRecord record = payloadGenerator.nextRecord(topicIndexToName(i)); - future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); + ProducerRecord record = new ProducerRecord( + topicIndexToName(i), 0, keys.next(), values.next()); + future = producer.send(record, + new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); } throttle.increment(); } @@ -216,7 +221,6 @@ public class ProduceBenchWorker implements TaskWorker { statusUpdaterFuture.cancel(false); new StatusUpdater(histogram).run(); long curTimeMs = Time.SYSTEM.milliseconds(); - log.info("Produced {}", payloadGenerator); log.info("Sent {} total record(s) in {} ms. status: {}", histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get()); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 5dfac1f6c72..1b9cb8f1af0 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -69,6 +70,8 @@ public class RoundTripWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class); + private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0); + private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker(); private final String id; @@ -183,7 +186,6 @@ public class RoundTripWorker implements TaskWorker { int perPeriod = WorkerUtils. perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); - payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX); } @Override @@ -206,7 +208,9 @@ public class RoundTripWorker implements TaskWorker { } messagesSent++; // we explicitly specify generator position based on message index - ProducerRecord record = payloadGenerator.nextRecord(TOPIC_NAME, messageIndex); + ProducerRecord record = new ProducerRecord(TOPIC_NAME, 0, + KEY_GENERATOR.generate(messageIndex), + spec.valueGenerator().generate(messageIndex)); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -286,7 +290,7 @@ public class RoundTripWorker implements TaskWorker { pollInvoked++; ConsumerRecords records = consumer.poll(50); for (ConsumerRecord record : records.records(TOPIC_NAME)) { - int messageIndex = ByteBuffer.wrap(record.key()).getInt(); + int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt(); messagesReceived++; if (toReceiveTracker.removePending(messageIndex)) { uniqueMessagesReceived++; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java index 618c709dde0..00bd833c052 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java @@ -39,6 +39,7 @@ public class RoundTripWorkloadSpec extends TaskSpec { private final String bootstrapServers; private final int targetMessagesPerSec; private final NavigableMap> partitionAssignments; + private final PayloadGenerator valueGenerator; private final int maxMessages; @JsonCreator @@ -48,6 +49,7 @@ public class RoundTripWorkloadSpec extends TaskSpec { @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, @JsonProperty("partitionAssignments") NavigableMap> partitionAssignments, + @JsonProperty("valueGenerator") PayloadGenerator valueGenerator, @JsonProperty("maxMessages") int maxMessages) { super(startMs, durationMs); this.clientNode = clientNode == null ? "" : clientNode; @@ -55,6 +57,8 @@ public class RoundTripWorkloadSpec extends TaskSpec { this.targetMessagesPerSec = targetMessagesPerSec; this.partitionAssignments = partitionAssignments == null ? new TreeMap>() : partitionAssignments; + this.valueGenerator = valueGenerator == null ? + new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator; this.maxMessages = maxMessages; } @@ -78,6 +82,11 @@ public class RoundTripWorkloadSpec extends TaskSpec { return partitionAssignments; } + @JsonProperty + public PayloadGenerator valueGenerator() { + return valueGenerator; + } + @JsonProperty public int maxMessages() { return maxMessages; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java new file mode 100644 index 00000000000..e0b785ade36 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * A PayloadGenerator which generates a sequentially increasing payload. + * + * The generated number will wrap around to 0 after the maximum value is reached. + * Payloads bigger than 8 bytes will always just be padded with zeros after byte 8. + */ +public class SequentialPayloadGenerator implements PayloadGenerator { + private final int size; + private final long startOffset; + private final ByteBuffer buf; + + @JsonCreator + public SequentialPayloadGenerator(@JsonProperty("size") int size, + @JsonProperty("offset") long startOffset) { + this.size = size; + this.startOffset = startOffset; + this.buf = ByteBuffer.allocate(8); + // Little-endian byte order allows us to support arbitrary lengths more easily, + // since the first byte is always the lowest-order byte. + this.buf.order(ByteOrder.LITTLE_ENDIAN); + } + + @JsonProperty + public int size() { + return size; + } + + @JsonProperty + public long startOffset() { + return startOffset; + } + + @Override + public synchronized byte[] generate(long position) { + buf.clear(); + buf.putLong(position + startOffset); + byte[] result = new byte[size]; + System.arraycopy(buf.array(), 0, result, 0, Math.min(buf.array().length, result.length)); + return result; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java new file mode 100644 index 00000000000..4642dcf18ae --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Random; + +/** + * A PayloadGenerator which generates a uniform random payload. + * + * This generator generates pseudo-random payloads that can be reproduced from run to run. + * The guarantees are the same as those of java.util.Random. + * + * This payload generator also has the option to append padding bytes at the end of the payload. + * The padding bytes are always the same, no matter what the position is. This is useful when + * simulating a partly-compressible stream of user data. + */ +public class UniformRandomPayloadGenerator implements PayloadGenerator { + private final int size; + private final long seed; + private final int padding; + private final Random random = new Random(); + private final byte[] padBytes; + private final byte[] randomBytes; + + @JsonCreator + public UniformRandomPayloadGenerator(@JsonProperty("size") int size, + @JsonProperty("seed") long seed, + @JsonProperty("padding") int padding) { + this.size = size; + this.seed = seed; + this.padding = padding; + if (padding < 0 || padding > size) { + throw new RuntimeException("Invalid value " + padding + " for " + + "padding: the number of padding bytes must not be smaller than " + + "0 or greater than the total payload size."); + } + this.padBytes = new byte[padding]; + random.setSeed(seed); + random.nextBytes(padBytes); + this.randomBytes = new byte[size - padding]; + } + + @JsonProperty + public int size() { + return size; + } + + @JsonProperty + public long seed() { + return seed; + } + + @JsonProperty + public int padding() { + return padding; + } + + @Override + public synchronized byte[] generate(long position) { + byte[] result = new byte[size]; + if (randomBytes.length > 0) { + random.setSeed(seed + position); + random.nextBytes(randomBytes); + System.arraycopy(randomBytes, 0, result, 0, Math.min(randomBytes.length, result.length)); + } + if (padBytes.length > 0) { + System.arraycopy(padBytes, 0, result, randomBytes.length, result.length - randomBytes.length); + } + return result; + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index 4e65d994da7..77a793236ea 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -49,9 +49,9 @@ public class JsonSerializationTest { verify(new WorkerRunning(null, 0, null)); verify(new WorkerStopping(null, 0, null)); verify(new ProduceBenchSpec(0, 0, null, null, - 0, 0, 0, null, 0, 0)); + 0, 0, null, null, null, 0, 0)); verify(new RoundTripWorkloadSpec(0, 0, null, null, - 0, null, 0)); + 0, null, null, 0)); verify(new SampleTaskSpec(0, 0, 0, null)); } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java index d2954a5d4cc..25ef2e326b6 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java @@ -17,128 +17,126 @@ package org.apache.kafka.trogdor.workload; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; public class PayloadGeneratorTest { + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); @Test - public void testGeneratorStartsAtPositionZero() { - PayloadGenerator payloadGenerator = new PayloadGenerator(); - assertEquals(0, payloadGenerator.position()); + public void testConstantPayloadGenerator() { + byte[] alphabet = new byte[26]; + for (int i = 0; i < alphabet.length; i++) { + alphabet[i] = (byte) ('a' + i); + } + byte[] expectedSuperset = new byte[512]; + for (int i = 0; i < expectedSuperset.length; i++) { + expectedSuperset[i] = (byte) ('a' + (i % 26)); + } + for (int i : new int[] {1, 5, 10, 100, 511, 512}) { + ConstantPayloadGenerator generator = new ConstantPayloadGenerator(i, alphabet); + assertArrayContains(expectedSuperset, generator.generate(0)); + assertArrayContains(expectedSuperset, generator.generate(10)); + assertArrayContains(expectedSuperset, generator.generate(100)); + } } - @Test - public void testDefaultPayload() { - final long numRecords = 262; - PayloadGenerator payloadGenerator = new PayloadGenerator(); - - // make sure that each time we produce a different value (except if compression rate is 0) - byte[] prevValue = null; - long expectedPosition = 0; - for (int i = 0; i < numRecords; i++) { - ProducerRecord record = payloadGenerator.nextRecord("test-topic"); - assertNull(record.key()); - assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length); - assertEquals(++expectedPosition, payloadGenerator.position()); - assertFalse("Position " + payloadGenerator.position(), - Arrays.equals(prevValue, record.value())); - prevValue = record.value().clone(); - } + private static void assertArrayContains(byte[] expectedSuperset, byte[] actual) { + byte[] expected = new byte[actual.length]; + System.arraycopy(expectedSuperset, 0, expected, 0, expected.length); + assertArrayEquals(expected, actual); } @Test - public void testNullKeyTypeValueSizeIsMessageSize() { - final int size = 200; - PayloadGenerator payloadGenerator = new PayloadGenerator(size); - ProducerRecord record = payloadGenerator.nextRecord("test-topic"); - assertNull(record.key()); - assertEquals(size, record.value().length); + public void testSequentialPayloadGenerator() { + SequentialPayloadGenerator g4 = new SequentialPayloadGenerator(4, 1); + assertLittleEndianArrayEquals(1, g4.generate(0)); + assertLittleEndianArrayEquals(2, g4.generate(1)); + + SequentialPayloadGenerator g8 = new SequentialPayloadGenerator(8, 0); + assertLittleEndianArrayEquals(0, g8.generate(0)); + assertLittleEndianArrayEquals(1, g8.generate(1)); + assertLittleEndianArrayEquals(123123123123L, g8.generate(123123123123L)); + + SequentialPayloadGenerator g2 = new SequentialPayloadGenerator(2, 0); + assertLittleEndianArrayEquals(0, g2.generate(0)); + assertLittleEndianArrayEquals(1, g2.generate(1)); + assertLittleEndianArrayEquals(1, g2.generate(1)); + assertLittleEndianArrayEquals(1, g2.generate(131073)); } - @Test - public void testKeyContainsGeneratorPosition() { - final long numRecords = 10; - final int size = 200; - PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); - for (int i = 0; i < numRecords; i++) { - assertEquals(i, generator.position()); - ProducerRecord record = generator.nextRecord("test-topic"); - assertEquals(8, record.key().length); - assertEquals(size - 8, record.value().length); - assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong()); - } + private static void assertLittleEndianArrayEquals(long expected, byte[] actual) { + byte[] longActual = new byte[8]; + System.arraycopy(actual, 0, longActual, 0, Math.min(actual.length, longActual.length)); + ByteBuffer buf = ByteBuffer.wrap(longActual).order(ByteOrder.LITTLE_ENDIAN); + assertEquals(expected, buf.getLong()); } @Test - public void testGeneratePayloadWithExplicitPosition() { - final int size = 200; - PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); - int position = 2; - while (position < 5000000) { - ProducerRecord record = generator.nextRecord("test-topic", position); - assertEquals(8, record.key().length); - assertEquals(size - 8, record.value().length); - assertEquals(position, ByteBuffer.wrap(record.key()).getLong()); - position = position * 64; + public void testUniformRandomPayloadGenerator() { + PayloadIterator iter = new PayloadIterator( + new UniformRandomPayloadGenerator(1234, 456, 0)); + byte[] prev = iter.next(); + for (int uniques = 0; uniques < 1000; ) { + byte[] cur = iter.next(); + assertEquals(prev.length, cur.length); + if (!Arrays.equals(prev, cur)) { + uniques++; + } } + testReproducible(new UniformRandomPayloadGenerator(1234, 456, 0)); + testReproducible(new UniformRandomPayloadGenerator(1, 0, 0)); + testReproducible(new UniformRandomPayloadGenerator(10, 6, 5)); + testReproducible(new UniformRandomPayloadGenerator(512, 123, 100)); } - public void testSamePositionGeneratesSameKeyAndValue() { - final int size = 100; - PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); - ProducerRecord record1 = generator.nextRecord("test-topic"); - assertEquals(1, generator.position()); - ProducerRecord record2 = generator.nextRecord("test-topic"); - assertEquals(2, generator.position()); - ProducerRecord record3 = generator.nextRecord("test-topic", 0); - // position should not change if we generated record with specific position - assertEquals(2, generator.position()); - assertFalse("Values at different positions should not match.", - Arrays.equals(record1.value(), record2.value())); - assertFalse("Values at different positions should not match.", - Arrays.equals(record3.value(), record2.value())); - assertTrue("Values at the same position should match.", - Arrays.equals(record1.value(), record3.value())); - } - - @Test - public void testGeneratesDeterministicKeyValues() { - final long numRecords = 194; - final int size = 100; - PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); - PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX); - for (int i = 0; i < numRecords; ++i) { - ProducerRecord record1 = generator1.nextRecord("test-topic"); - ProducerRecord record2 = generator2.nextRecord("test-topic"); - assertTrue(Arrays.equals(record1.value(), record2.value())); - assertTrue(Arrays.equals(record1.key(), record2.key())); - } + private static void testReproducible(PayloadGenerator generator) { + byte[] val = generator.generate(123); + generator.generate(456); + byte[] val2 = generator.generate(123); + assertArrayEquals(val, val2); } @Test - public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() { - PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX); - ProducerRecord record = payloadGenerator.nextRecord("test-topic", 877); - assertEquals(8, record.key().length); - assertEquals(0, record.value().length); + public void testUniformRandomPayloadGeneratorPaddingBytes() { + UniformRandomPayloadGenerator generator = + new UniformRandomPayloadGenerator(1000, 456, 100); + byte[] val1 = generator.generate(0); + byte[] val1End = new byte[100]; + System.arraycopy(val1, 900, val1End, 0, 100); + byte[] val2 = generator.generate(100); + byte[] val2End = new byte[100]; + System.arraycopy(val2, 900, val2End, 0, 100); + byte[] val3 = generator.generate(200); + byte[] val3End = new byte[100]; + System.arraycopy(val3, 900, val3End, 0, 100); + assertArrayEquals(val1End, val2End); + assertArrayEquals(val1End, val3End); } @Test - public void testNextRecordGeneratesNewByteArrayForValue() { - PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX); - ProducerRecord record1 = payloadGenerator.nextRecord("test-topic", 877); - ProducerRecord record2 = payloadGenerator.nextRecord("test-topic", 877); - assertNotEquals(record1.value(), record2.value()); + public void testPayloadIterator() { + final int expectedSize = 50; + PayloadIterator iter = new PayloadIterator( + new ConstantPayloadGenerator(expectedSize, new byte[0])); + final byte[] expected = new byte[expectedSize]; + assertEquals(0, iter.position()); + assertArrayEquals(expected, iter.next()); + assertEquals(1, iter.position()); + assertArrayEquals(expected, iter.next()); + assertArrayEquals(expected, iter.next()); + assertEquals(3, iter.position()); + iter.seek(0); + assertEquals(0, iter.position()); } }