You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
282 lines
14 KiB
282 lines
14 KiB
# Licensed to the Apache Software Foundation (ASF) under one or more |
|
# contributor license agreements. See the NOTICE file distributed with |
|
# this work for additional information regarding copyright ownership. |
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
|
# (the "License"); you may not use this file except in compliance with |
|
# the License. You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
from ducktape.mark import matrix |
|
from ducktape.mark import parametrize |
|
from ducktape.mark.resource import cluster |
|
from ducktape.services.service import Service |
|
from ducktape.tests.test import Test |
|
|
|
from kafkatest.services.kafka import KafkaService |
|
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput |
|
from kafkatest.services.zookeeper import ZookeeperService |
|
from kafkatest.version import DEV_BRANCH, KafkaVersion |
|
|
|
TOPIC_REP_ONE = "topic-replication-factor-one" |
|
TOPIC_REP_THREE = "topic-replication-factor-three" |
|
DEFAULT_RECORD_SIZE = 100 # bytes |
|
|
|
|
|
class Benchmark(Test): |
|
"""A benchmark of Kafka producer/consumer performance. This replicates the test |
|
run here: |
|
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines |
|
""" |
|
def __init__(self, test_context): |
|
super(Benchmark, self).__init__(test_context) |
|
self.num_zk = 1 |
|
self.num_brokers = 3 |
|
self.topics = { |
|
TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1}, |
|
TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3} |
|
} |
|
|
|
self.zk = ZookeeperService(test_context, self.num_zk) |
|
|
|
self.msgs_large = 10000000 |
|
self.batch_size = 8*1024 |
|
self.buffer_memory = 64*1024*1024 |
|
self.msg_sizes = [10, 100, 1000, 10000, 100000] |
|
self.target_data_size = 128*1024*1024 |
|
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024) |
|
|
|
def setUp(self): |
|
self.zk.start() |
|
|
|
def start_kafka(self, security_protocol, interbroker_security_protocol, version): |
|
self.kafka = KafkaService( |
|
self.test_context, self.num_brokers, |
|
self.zk, security_protocol=security_protocol, |
|
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, |
|
version=version) |
|
self.kafka.log_level = "INFO" # We don't DEBUG logging here |
|
self.kafka.start() |
|
|
|
@cluster(num_nodes=5) |
|
@parametrize(acks=1, topic=TOPIC_REP_ONE) |
|
@parametrize(acks=1, topic=TOPIC_REP_THREE) |
|
@parametrize(acks=-1, topic=TOPIC_REP_THREE) |
|
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL']) |
|
@cluster(num_nodes=7) |
|
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) |
|
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, |
|
compression_type="none", security_protocol='PLAINTEXT', client_version=str(DEV_BRANCH), |
|
broker_version=str(DEV_BRANCH)): |
|
""" |
|
Setup: 1 node zk + 3 node kafka cluster |
|
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, |
|
security protocol and message size are varied depending on arguments injected into this test. |
|
|
|
Collect and return aggregate throughput statistics after all messages have been acknowledged. |
|
(This runs ProducerPerformance.java under the hood) |
|
""" |
|
client_version = KafkaVersion(client_version) |
|
broker_version = KafkaVersion(broker_version) |
|
self.validate_versions(client_version, broker_version) |
|
self.start_kafka(security_protocol, security_protocol, broker_version) |
|
# Always generate the same total amount of data |
|
nrecords = int(self.target_data_size / message_size) |
|
|
|
self.producer = ProducerPerformanceService( |
|
self.test_context, num_producers, self.kafka, topic=topic, |
|
num_records=nrecords, record_size=message_size, throughput=-1, version=client_version, |
|
settings={ |
|
'acks': acks, |
|
'compression.type': compression_type, |
|
'batch.size': self.batch_size, |
|
'buffer.memory': self.buffer_memory}) |
|
self.producer.run() |
|
return compute_aggregate_throughput(self.producer) |
|
|
|
@cluster(num_nodes=5) |
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') |
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) |
|
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT', |
|
interbroker_security_protocol=None, client_version=str(DEV_BRANCH), |
|
broker_version=str(DEV_BRANCH)): |
|
""" |
|
Setup: 1 node zk + 3 node kafka cluster |
|
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. |
|
|
|
Collect and return aggregate throughput statistics after all messages have been acknowledged. |
|
|
|
(This runs ProducerPerformance.java under the hood) |
|
""" |
|
client_version = KafkaVersion(client_version) |
|
broker_version = KafkaVersion(broker_version) |
|
self.validate_versions(client_version, broker_version) |
|
if interbroker_security_protocol is None: |
|
interbroker_security_protocol = security_protocol |
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) |
|
self.producer = ProducerPerformanceService( |
|
self.test_context, 1, self.kafka, |
|
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, |
|
throughput=-1, version=client_version, settings={ |
|
'acks': 1, |
|
'compression.type': compression_type, |
|
'batch.size': self.batch_size, |
|
'buffer.memory': self.buffer_memory |
|
}, |
|
intermediate_stats=True |
|
) |
|
self.producer.run() |
|
|
|
summary = ["Throughput over long run, data > memory:"] |
|
data = {} |
|
# FIXME we should be generating a graph too |
|
# Try to break it into 5 blocks, but fall back to a smaller number if |
|
# there aren't even 5 elements |
|
block_size = max(len(self.producer.stats[0]) / 5, 1) |
|
nblocks = len(self.producer.stats[0]) / block_size |
|
|
|
for i in range(nblocks): |
|
subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))] |
|
if len(subset) == 0: |
|
summary.append(" Time block %d: (empty)" % i) |
|
data[i] = None |
|
else: |
|
records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset)) |
|
mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset)) |
|
|
|
summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec)) |
|
data[i] = throughput(records_per_sec, mb_per_sec) |
|
|
|
self.logger.info("\n".join(summary)) |
|
return data |
|
|
|
@cluster(num_nodes=5) |
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') |
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) |
|
@cluster(num_nodes=6) |
|
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"]) |
|
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", |
|
interbroker_security_protocol=None, client_version=str(DEV_BRANCH), |
|
broker_version=str(DEV_BRANCH)): |
|
""" |
|
Setup: 1 node zk + 3 node kafka cluster |
|
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, |
|
measuring the latency between production and consumption of each message. |
|
|
|
Return aggregate latency statistics. |
|
|
|
(Under the hood, this simply runs EndToEndLatency.scala) |
|
""" |
|
client_version = KafkaVersion(client_version) |
|
broker_version = KafkaVersion(broker_version) |
|
self.validate_versions(client_version, broker_version) |
|
if interbroker_security_protocol is None: |
|
interbroker_security_protocol = security_protocol |
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) |
|
self.logger.info("BENCHMARK: End to end latency") |
|
self.perf = EndToEndLatencyService( |
|
self.test_context, 1, self.kafka, |
|
topic=TOPIC_REP_THREE, num_records=10000, |
|
compression_type=compression_type, version=client_version |
|
) |
|
self.perf.run() |
|
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) |
|
|
|
@cluster(num_nodes=6) |
|
@parametrize(security_protocol='PLAINTEXT', new_consumer=False) |
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') |
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) |
|
def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT", |
|
interbroker_security_protocol=None, new_consumer=True, |
|
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): |
|
""" |
|
Setup: 1 node zk + 3 node kafka cluster |
|
Concurrently produce and consume 10e6 messages with a single producer and a single consumer, |
|
using new consumer if new_consumer == True |
|
|
|
Return aggregate throughput statistics for both producer and consumer. |
|
|
|
(Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) |
|
""" |
|
client_version = KafkaVersion(client_version) |
|
broker_version = KafkaVersion(broker_version) |
|
self.validate_versions(client_version, broker_version) |
|
if interbroker_security_protocol is None: |
|
interbroker_security_protocol = security_protocol |
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) |
|
num_records = 10 * 1000 * 1000 # 10e6 |
|
|
|
self.producer = ProducerPerformanceService( |
|
self.test_context, 1, self.kafka, |
|
topic=TOPIC_REP_THREE, |
|
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, |
|
settings={ |
|
'acks': 1, |
|
'compression.type': compression_type, |
|
'batch.size': self.batch_size, |
|
'buffer.memory': self.buffer_memory |
|
} |
|
) |
|
self.consumer = ConsumerPerformanceService( |
|
self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) |
|
Service.run_parallel(self.producer, self.consumer) |
|
|
|
data = { |
|
"producer": compute_aggregate_throughput(self.producer), |
|
"consumer": compute_aggregate_throughput(self.consumer) |
|
} |
|
summary = [ |
|
"Producer + consumer:", |
|
str(data)] |
|
self.logger.info("\n".join(summary)) |
|
return data |
|
|
|
@cluster(num_nodes=6) |
|
@parametrize(security_protocol='PLAINTEXT', new_consumer=False) |
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') |
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) |
|
def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", |
|
interbroker_security_protocol=None, new_consumer=True, num_consumers=1, |
|
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): |
|
""" |
|
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions |
|
(using new consumer iff new_consumer == True), and report throughput. |
|
""" |
|
client_version = KafkaVersion(client_version) |
|
broker_version = KafkaVersion(broker_version) |
|
self.validate_versions(client_version, broker_version) |
|
if interbroker_security_protocol is None: |
|
interbroker_security_protocol = security_protocol |
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) |
|
num_records = 10 * 1000 * 1000 # 10e6 |
|
|
|
# seed kafka w/messages |
|
self.producer = ProducerPerformanceService( |
|
self.test_context, 1, self.kafka, |
|
topic=TOPIC_REP_THREE, |
|
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, |
|
settings={ |
|
'acks': 1, |
|
'compression.type': compression_type, |
|
'batch.size': self.batch_size, |
|
'buffer.memory': self.buffer_memory |
|
} |
|
) |
|
self.producer.run() |
|
|
|
# consume |
|
self.consumer = ConsumerPerformanceService( |
|
self.test_context, num_consumers, self.kafka, |
|
topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) |
|
self.consumer.group = "test-consumer-group" |
|
self.consumer.run() |
|
return compute_aggregate_throughput(self.consumer) |
|
|
|
def validate_versions(self, client_version, broker_version): |
|
assert client_version <= broker_version, "Client version %s should be <= than broker version %s" (client_version, broker_version)
|
|
|