You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
254 lines
0 B
254 lines
0 B
9 years ago
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||
|
# contributor license agreements. See the NOTICE file distributed with
|
||
|
# this work for additional information regarding copyright ownership.
|
||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||
|
# (the "License"); you may not use this file except in compliance with
|
||
|
# the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
from ducktape.services.service import Service
|
||
9 years ago
|
from ducktape.tests.test import Test
|
||
9 years ago
|
from ducktape.mark import parametrize
|
||
|
from ducktape.mark import matrix
|
||
9 years ago
|
|
||
9 years ago
|
from kafkatest.services.zookeeper import ZookeeperService
|
||
|
from kafkatest.services.kafka import KafkaService
|
||
9 years ago
|
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
|
||
|
|
||
|
|
||
|
TOPIC_REP_ONE = "topic-replication-factor-one"
|
||
|
TOPIC_REP_THREE = "topic-replication-factor-three"
|
||
|
DEFAULT_RECORD_SIZE = 100 # bytes
|
||
9 years ago
|
|
||
|
|
||
9 years ago
|
class Benchmark(Test):
|
||
9 years ago
|
"""A benchmark of Kafka producer/consumer performance. This replicates the test
|
||
9 years ago
|
run here:
|
||
|
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
|
||
9 years ago
|
"""
|
||
9 years ago
|
def __init__(self, test_context):
|
||
9 years ago
|
super(Benchmark, self).__init__(test_context)
|
||
|
self.num_zk = 1
|
||
|
self.num_brokers = 3
|
||
|
self.topics = {
|
||
9 years ago
|
TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
|
||
|
TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
|
||
9 years ago
|
}
|
||
|
|
||
|
self.zk = ZookeeperService(test_context, self.num_zk)
|
||
9 years ago
|
|
||
|
self.msgs_large = 10000000
|
||
|
self.batch_size = 8*1024
|
||
|
self.buffer_memory = 64*1024*1024
|
||
|
self.msg_sizes = [10, 100, 1000, 10000, 100000]
|
||
|
self.target_data_size = 128*1024*1024
|
||
|
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
|
||
|
|
||
9 years ago
|
def setUp(self):
|
||
|
self.zk.start()
|
||
|
|
||
|
def start_kafka(self, security_protocol, interbroker_security_protocol):
|
||
|
self.kafka = KafkaService(
|
||
|
self.test_context, self.num_brokers,
|
||
|
self.zk, security_protocol=security_protocol,
|
||
|
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
|
||
9 years ago
|
self.kafka.log_level = "INFO" # We don't DEBUG logging here
|
||
9 years ago
|
self.kafka.start()
|
||
|
|
||
|
@parametrize(acks=1, topic=TOPIC_REP_ONE)
|
||
|
@parametrize(acks=1, topic=TOPIC_REP_THREE)
|
||
|
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
|
||
|
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
|
||
|
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL'])
|
||
|
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'):
|
||
9 years ago
|
"""
|
||
|
Setup: 1 node zk + 3 node kafka cluster
|
||
|
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
|
||
9 years ago
|
security protocol and message size are varied depending on arguments injected into this test.
|
||
9 years ago
|
|
||
|
Collect and return aggregate throughput statistics after all messages have been acknowledged.
|
||
|
(This runs ProducerPerformance.java under the hood)
|
||
|
"""
|
||
9 years ago
|
self.start_kafka(security_protocol, security_protocol)
|
||
9 years ago
|
# Always generate the same total amount of data
|
||
|
nrecords = int(self.target_data_size / message_size)
|
||
9 years ago
|
|
||
9 years ago
|
self.producer = ProducerPerformanceService(
|
||
9 years ago
|
self.test_context, num_producers, self.kafka, topic=topic,
|
||
9 years ago
|
num_records=nrecords, record_size=message_size, throughput=-1,
|
||
|
settings={
|
||
|
'acks': acks,
|
||
|
'batch.size': self.batch_size,
|
||
|
'buffer.memory': self.buffer_memory})
|
||
|
self.producer.run()
|
||
|
return compute_aggregate_throughput(self.producer)
|
||
9 years ago
|
|
||
9 years ago
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
|
||
|
def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None):
|
||
9 years ago
|
"""
|
||
|
Setup: 1 node zk + 3 node kafka cluster
|
||
|
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
|
||
9 years ago
|
|
||
9 years ago
|
Collect and return aggregate throughput statistics after all messages have been acknowledged.
|
||
9 years ago
|
|
||
9 years ago
|
(This runs ProducerPerformance.java under the hood)
|
||
|
"""
|
||
9 years ago
|
if interbroker_security_protocol is None:
|
||
|
interbroker_security_protocol = security_protocol
|
||
|
self.start_kafka(security_protocol, interbroker_security_protocol)
|
||
9 years ago
|
self.producer = ProducerPerformanceService(
|
||
9 years ago
|
self.test_context, 1, self.kafka,
|
||
9 years ago
|
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
|
||
|
throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
|
||
9 years ago
|
intermediate_stats=True
|
||
|
)
|
||
9 years ago
|
self.producer.run()
|
||
9 years ago
|
|
||
|
summary = ["Throughput over long run, data > memory:"]
|
||
|
data = {}
|
||
|
# FIXME we should be generating a graph too
|
||
|
# Try to break it into 5 blocks, but fall back to a smaller number if
|
||
|
# there aren't even 5 elements
|
||
9 years ago
|
block_size = max(len(self.producer.stats[0]) / 5, 1)
|
||
|
nblocks = len(self.producer.stats[0]) / block_size
|
||
|
|
||
9 years ago
|
for i in range(nblocks):
|
||
9 years ago
|
subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
|
||
9 years ago
|
if len(subset) == 0:
|
||
|
summary.append(" Time block %d: (empty)" % i)
|
||
|
data[i] = None
|
||
|
else:
|
||
|
records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
|
||
|
mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
|
||
|
|
||
|
summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
|
||
|
data[i] = throughput(records_per_sec, mb_per_sec)
|
||
|
|
||
|
self.logger.info("\n".join(summary))
|
||
|
return data
|
||
|
|
||
9 years ago
|
|
||
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||
|
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
|
||
|
def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None):
|
||
9 years ago
|
"""
|
||
|
Setup: 1 node zk + 3 node kafka cluster
|
||
|
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
|
||
|
measuring the latency between production and consumption of each message.
|
||
|
|
||
|
Return aggregate latency statistics.
|
||
|
|
||
|
(Under the hood, this simply runs EndToEndLatency.scala)
|
||
|
"""
|
||
9 years ago
|
if interbroker_security_protocol is None:
|
||
|
interbroker_security_protocol = security_protocol
|
||
9 years ago
|
self.start_kafka(security_protocol, interbroker_security_protocol)
|
||
9 years ago
|
self.logger.info("BENCHMARK: End to end latency")
|
||
|
self.perf = EndToEndLatencyService(
|
||
|
self.test_context, 1, self.kafka,
|
||
9 years ago
|
topic=TOPIC_REP_THREE, num_records=10000
|
||
9 years ago
|
)
|
||
|
self.perf.run()
|
||
9 years ago
|
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
|
||
9 years ago
|
|
||
9 years ago
|
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
|
||
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
|
||
|
def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True):
|
||
9 years ago
|
"""
|
||
|
Setup: 1 node zk + 3 node kafka cluster
|
||
|
Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
|
||
|
using new consumer if new_consumer == True
|
||
|
|
||
|
Return aggregate throughput statistics for both producer and consumer.
|
||
|
|
||
|
(Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
|
||
|
"""
|
||
9 years ago
|
if interbroker_security_protocol is None:
|
||
|
interbroker_security_protocol = security_protocol
|
||
9 years ago
|
self.start_kafka(security_protocol, interbroker_security_protocol)
|
||
9 years ago
|
num_records = 10 * 1000 * 1000 # 10e6
|
||
9 years ago
|
|
||
|
self.producer = ProducerPerformanceService(
|
||
9 years ago
|
self.test_context, 1, self.kafka,
|
||
|
topic=TOPIC_REP_THREE,
|
||
9 years ago
|
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
|
||
9 years ago
|
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
|
||
9 years ago
|
)
|
||
|
self.consumer = ConsumerPerformanceService(
|
||
9 years ago
|
self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
|
||
9 years ago
|
Service.run_parallel(self.producer, self.consumer)
|
||
|
|
||
|
data = {
|
||
9 years ago
|
"producer": compute_aggregate_throughput(self.producer),
|
||
|
"consumer": compute_aggregate_throughput(self.consumer)
|
||
9 years ago
|
}
|
||
|
summary = [
|
||
|
"Producer + consumer:",
|
||
|
str(data)]
|
||
|
self.logger.info("\n".join(summary))
|
||
|
return data
|
||
|
|
||
9 years ago
|
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
|
||
|
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||
|
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
|
||
|
def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1):
|
||
9 years ago
|
"""
|
||
|
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
|
||
|
(using new consumer iff new_consumer == True), and report throughput.
|
||
|
"""
|
||
9 years ago
|
if interbroker_security_protocol is None:
|
||
|
interbroker_security_protocol = security_protocol
|
||
9 years ago
|
self.start_kafka(security_protocol, interbroker_security_protocol)
|
||
9 years ago
|
num_records = 10 * 1000 * 1000 # 10e6
|
||
9 years ago
|
|
||
9 years ago
|
# seed kafka w/messages
|
||
9 years ago
|
self.producer = ProducerPerformanceService(
|
||
|
self.test_context, 1, self.kafka,
|
||
9 years ago
|
topic=TOPIC_REP_THREE,
|
||
9 years ago
|
num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
|
||
9 years ago
|
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
|
||
9 years ago
|
)
|
||
|
self.producer.run()
|
||
|
|
||
9 years ago
|
# consume
|
||
|
self.consumer = ConsumerPerformanceService(
|
||
|
self.test_context, num_consumers, self.kafka,
|
||
9 years ago
|
topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
|
||
9 years ago
|
self.consumer.group = "test-consumer-group"
|
||
|
self.consumer.run()
|
||
|
return compute_aggregate_throughput(self.consumer)
|
||
9 years ago
|
|
||
|
|
||
|
def throughput(records_per_sec, mb_per_sec):
|
||
|
"""Helper method to ensure uniform representation of throughput data"""
|
||
|
return {
|
||
|
"records_per_sec": records_per_sec,
|
||
|
"mb_per_sec": mb_per_sec
|
||
|
}
|
||
|
|
||
|
|
||
|
def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
|
||
|
"""Helper method to ensure uniform representation of latency data"""
|
||
|
return {
|
||
|
"latency_50th_ms": latency_50th_ms,
|
||
|
"latency_99th_ms": latency_99th_ms,
|
||
|
"latency_999th_ms": latency_999th_ms
|
||
|
}
|
||
|
|
||
|
|
||
9 years ago
|
def compute_aggregate_throughput(perf):
|
||
9 years ago
|
"""Helper method for computing throughput after running a performance service."""
|
||
|
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
|
||
|
aggregate_mbps = sum([r['mbps'] for r in perf.results])
|
||
|
|
||
|
return throughput(aggregate_rate, aggregate_mbps)
|