|
|
|
@ -261,24 +261,24 @@ public class SimpleBenchmark {
@@ -261,24 +261,24 @@ public class SimpleBenchmark {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Properties setProduceConsumeProperties(final String clientId) { |
|
|
|
|
Properties props = new Properties(); |
|
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); |
|
|
|
|
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); |
|
|
|
|
Properties clientProps = new Properties(); |
|
|
|
|
clientProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); |
|
|
|
|
clientProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); |
|
|
|
|
// the socket buffer needs to be large, especially when running in AWS with
|
|
|
|
|
// high latency. if running locally the default is fine.
|
|
|
|
|
props.put(ProducerConfig.LINGER_MS_CONFIG, 5000); |
|
|
|
|
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024); |
|
|
|
|
props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); |
|
|
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); |
|
|
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
|
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); |
|
|
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); |
|
|
|
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); |
|
|
|
|
clientProps.put(ProducerConfig.LINGER_MS_CONFIG, 5000); |
|
|
|
|
clientProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024); |
|
|
|
|
clientProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); |
|
|
|
|
clientProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); |
|
|
|
|
clientProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
|
|
|
|
clientProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); |
|
|
|
|
clientProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); |
|
|
|
|
clientProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); |
|
|
|
|
// the socket buffer needs to be large, especially when running in AWS with
|
|
|
|
|
// high latency. if running locally the default is fine.
|
|
|
|
|
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); |
|
|
|
|
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); |
|
|
|
|
return props; |
|
|
|
|
clientProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); |
|
|
|
|
clientProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); |
|
|
|
|
return clientProps; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void resetStats() { |
|
|
|
|