Browse Source

KAFKA-9123 Test a large number of replicas (#7621)

Two tests using 50k replicas on 8 brokers:
* Do a rolling restart with clean shutdown, delete topics
* Run produce bench and consumer bench on a subset of topics

Reviewed-By: David Jacot <djacot@confluent.io>, Vikas Singh <vikas@confluent.io>, Jason Gustafson <jason@confluent.io>
pull/7742/head
David Arthur 5 years ago committed by GitHub
parent
commit
b15e05d925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      tests/kafkatest/services/kafka/kafka.py
  2. 2
      tests/kafkatest/services/trogdor/trogdor.py
  3. 139
      tests/kafkatest/tests/core/replica_scale_test.py
  4. 8
      vagrant/base.sh

31
tests/kafkatest/services/kafka/kafka.py

@ -337,7 +337,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -337,7 +337,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
def start_node(self, node):
def start_node(self, node, timeout_sec=60):
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
@ -353,7 +353,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -353,7 +353,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
node.account.ssh(cmd)
# Kafka 1.0.0 and higher don't have a space between "Kafka" and "Server"
monitor.wait_until("Kafka\s*Server.*started", timeout_sec=60, backoff_sec=.25, err_msg="Kafka server didn't finish startup")
monitor.wait_until("Kafka\s*Server.*started", timeout_sec=timeout_sec, backoff_sec=.25,
err_msg="Kafka server didn't finish startup in %d seconds" % timeout_sec)
# Credentials for inter-broker communication are created before starting Kafka.
# Client credentials are created after starting Kafka so that both loading of
@ -382,7 +383,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -382,7 +383,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
leader = self.leader(topic, partition)
self.signal_node(leader, sig)
def stop_node(self, node, clean_shutdown=True):
def stop_node(self, node, clean_shutdown=True, timeout_sec=60):
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
@ -390,7 +391,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -390,7 +391,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node.account.signal(pid, sig, allow_fail=False)
try:
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Kafka node failed to stop")
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec,
err_msg="Kafka node failed to stop in %d seconds" % timeout_sec)
except Exception:
self.thread_dump(node)
raise
@ -447,10 +449,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -447,10 +449,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Running topic creation command...\n%s" % cmd)
node.account.ssh(cmd)
time.sleep(1)
self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
self.logger.info(line)
def delete_topic(self, topic, node=None):
"""
Delete a topic with the topics command
:param topic:
:param node:
:return:
"""
if node is None:
node = self.nodes[0]
self.logger.info("Deleting topic %s" % topic)
kafka_topic_script = self.path.script("kafka-topics.sh", node)
cmd = kafka_topic_script + " "
cmd += "--bootstrap-server %(bootstrap_servers)s --delete --topic %(topic)s " % {
'bootstrap_servers': self.bootstrap_servers(self.security_protocol),
'topic': topic
}
def describe_topic(self, topic, node=None):
if node is None:

2
tests/kafkatest/services/trogdor/trogdor.py

@ -202,7 +202,7 @@ class TrogdorService(KafkaPathResolverMixin, Service): @@ -202,7 +202,7 @@ class TrogdorService(KafkaPathResolverMixin, Service):
"""
session = requests.Session()
session.mount('http://',
HTTPAdapter(max_retries=Retry(total=4, backoff_factor=0.3)))
HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.3)))
return session
def _coordinator_post(self, path, message):

139
tests/kafkatest/tests/core/replica_scale_test.py

@ -0,0 +1,139 @@ @@ -0,0 +1,139 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark.resource import cluster
from ducktape.mark import parametrize
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.kafka import KafkaService
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService
import json
import time
class ReplicaScaleTest(Test):
def __init__(self, test_context):
super(ReplicaScaleTest, self).__init__(test_context=test_context)
self.test_context = test_context
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk)
def setUp(self):
self.zk.start()
self.kafka.start()
def teardown(self):
# Need to increase the timeout due to partition count
for node in self.kafka.nodes:
self.kafka.stop_node(node, clean_shutdown=False, timeout_sec=60)
self.kafka.stop()
self.zk.stop()
@cluster(num_nodes=12)
@parametrize(topic_count=500, partition_count=34, replication_factor=3)
def test_produce_consume(self, topic_count, partition_count, replication_factor):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "replicas_produce_consume_%d" % i
print("Creating topic %s" % topic) # Force some stdout for Jenkins
topic_cfg = {
"topic": topic,
"partitions": partition_count,
"replication-factor": replication_factor,
"configs": {"min.insync.replicas": 2}
}
self.kafka.create_topic(topic_cfg)
topics_create_end_time = time.time()
self.logger.info("Time to create topics: %d" % (topics_create_end_time - topics_create_start_time))
producer_workload_service = ProduceBenchWorkloadService(self.test_context, self.kafka)
consumer_workload_service = ConsumeBenchWorkloadService(self.test_context, self.kafka)
trogdor = TrogdorService(context=self.test_context,
client_services=[self.kafka, producer_workload_service, consumer_workload_service])
trogdor.start()
produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
producer_workload_service.producer_node,
producer_workload_service.bootstrap_servers,
target_messages_per_sec=10000,
max_messages=3400000,
producer_conf={},
admin_client_conf={},
common_client_conf={},
inactive_topics={},
active_topics={"replicas_produce_consume_[0-2]": {
"numPartitions": partition_count, "replicationFactor": replication_factor
}})
produce_workload = trogdor.create_task("replicas-produce-workload", produce_spec)
produce_workload.wait_for_done(timeout_sec=600)
self.logger.info("Completed produce bench")
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
consumer_workload_service.consumer_node,
consumer_workload_service.bootstrap_servers,
target_messages_per_sec=10000,
max_messages=3400000,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
active_topics=["replicas_produce_consume_[0-2]"])
consume_workload = trogdor.create_task("replicas-consume-workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=600)
self.logger.info("Completed consume bench")
trogdor.stop()
@cluster(num_nodes=12)
@parametrize(topic_count=500, partition_count=34, replication_factor=3)
def test_clean_bounce(self, topic_count, partition_count, replication_factor):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "topic-%04d" % i
print("Creating topic %s" % topic) # Force some stdout for Jenkins
topic_cfg = {
"topic": topic,
"partitions": partition_count,
"replication-factor": replication_factor,
"configs": {"min.insync.replicas": 2}
}
self.kafka.create_topic(topic_cfg)
topics_create_end_time = time.time()
self.logger.info("Time to create topics: %d" % (topics_create_end_time - topics_create_start_time))
restart_times = []
for node in self.kafka.nodes:
broker_bounce_start_time = time.time()
self.kafka.stop_node(node, clean_shutdown=True, timeout_sec=600)
self.kafka.start_node(node, timeout_sec=600)
broker_bounce_end_time = time.time()
restart_times.append(broker_bounce_end_time - broker_bounce_start_time)
self.logger.info("Time to restart %s: %d" % (node.name, broker_bounce_end_time - broker_bounce_start_time))
self.logger.info("Restart times: %s" % restart_times)
delete_start_time = time.time()
for i in range(topic_count):
topic = "topic-%04d" % i
self.logger.info("Deleting topic %s" % topic)
self.kafka.delete_topic(topic)
delete_end_time = time.time()
self.logger.info("Time to delete topics: %d" % (delete_end_time - delete_start_time))

8
vagrant/base.sh

@ -153,3 +153,11 @@ chmod a+rwx /mnt @@ -153,3 +153,11 @@ chmod a+rwx /mnt
ntpdate -u pool.ntp.org
# Install ntp daemon - it will automatically start on boot
apt-get -y install ntp
# Increase the ulimit
mkdir -p /etc/security/limits.d
echo "* soft nofile 128000" >> /etc/security/limits.d/nofile.conf
echo "* hard nofile 128000" >> /etc/security/limits.d/nofile.conf
ulimit -Hn 128000
ulimit -Sn 128000

Loading…
Cancel
Save