diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index b83097b8ffb..d0434af188c 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -32,7 +32,7 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev && apt-get -y clean +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean RUN python -m pip install -U pip==9.0.3; RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.6 diff --git a/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py b/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py index 450a7870cf3..2a3b142f3ec 100644 --- a/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py +++ b/tests/kafkatest/services/trogdor/degraded_network_fault_spec.py @@ -23,15 +23,26 @@ class DegradedNetworkFaultSpec(TaskSpec): Degrades the network so that traffic on a subset of nodes has higher latency """ - def __init__(self, start_ms, duration_ms, node_specs): + def __init__(self, start_ms, duration_ms): """ Create a new NetworkDegradeFaultSpec. :param start_ms: The start time, as described in task_spec.py :param duration_ms: The duration in milliseconds. - :param node_latencies: A dict of node name to desired latency - :param network_device: The name of the network device """ super(DegradedNetworkFaultSpec, self).__init__(start_ms, duration_ms) self.message["class"] = "org.apache.kafka.trogdor.fault.DegradedNetworkFaultSpec" - self.message["nodeSpecs"] = node_specs + self.message["nodeSpecs"] = {} + + def add_node_spec(self, node, networkDevice, latencyMs=0, rateLimitKbit=0): + """ + Add a node spec to this fault spec + :param node: The node name which is to be degraded + :param networkDevice: The network device name (e.g., eth0) to apply the degradation to + :param latencyMs: Optional. How much latency to add to each packet + :param rateLimitKbit: Optional. Maximum throughput in kilobits per second to allow + :return: + """ + self.message["nodeSpecs"][node] = { + "rateLimitKbit": rateLimitKbit, "latencyMs": latencyMs, "networkDevice": networkDevice + } diff --git a/tests/kafkatest/services/trogdor/trogdor.py b/tests/kafkatest/services/trogdor/trogdor.py index 4d514f2a394..6a6e888dc1e 100644 --- a/tests/kafkatest/services/trogdor/trogdor.py +++ b/tests/kafkatest/services/trogdor/trogdor.py @@ -300,6 +300,16 @@ class TrogdorTask(object): self.id = id self.trogdor = trogdor + def task_state_or_error(self): + task_state = self.trogdor.tasks()["tasks"][self.id] + if task_state is None: + raise RuntimeError("Coordinator did not know about %s." % self.id) + error = task_state.get("error") + if error is None or error == "": + return task_state["state"], None + else: + return None, error + def done(self): """ Check if this task is done. @@ -308,13 +318,25 @@ class TrogdorTask(object): :returns: True if the task is in DONE_STATE; False if it is in a different state. """ - task_state = self.trogdor.tasks()["tasks"][self.id] - if task_state is None: - raise RuntimeError("Coordinator did not know about %s." % self.id) - error = task_state.get("error") - if error is None or error == "": - return task_state["state"] == TrogdorTask.DONE_STATE - raise RuntimeError("Failed to gracefully stop %s: got task error: %s" % (self.id, error)) + (task_state, error) = self.task_state_or_error() + if task_state is not None: + return task_state == TrogdorTask.DONE_STATE + else: + raise RuntimeError("Failed to gracefully stop %s: got task error: %s" % (self.id, error)) + + def running(self): + """ + Check if this task is running. + + :raises RuntimeError: If the task encountered an error. + :returns: True if the task is in RUNNING_STATE; + False if it is in a different state. + """ + (task_state, error) = self.task_state_or_error() + if task_state is not None: + return task_state == TrogdorTask.RUNNING_STATE + else: + raise RuntimeError("Failed to start %s: got task error: %s" % (self.id, error)) def stop(self): """ diff --git a/tests/kafkatest/tests/core/network_degrade_test.py b/tests/kafkatest/tests/core/network_degrade_test.py new file mode 100644 index 00000000000..5b77d99c62d --- /dev/null +++ b/tests/kafkatest/tests/core/network_degrade_test.py @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +from ducktape.mark import parametrize +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until + +from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec +from kafkatest.services.trogdor.trogdor import TrogdorService +from kafkatest.services.zookeeper import ZookeeperService + + +class NetworkDegradeTest(Test): + """ + These tests ensure that the network degrade Trogdor specs (which use "tc") are working as expected in whatever + environment the system tests may be running in. The linux tools "ping" and "iperf" are used for validation + and need to be available along with "tc" in the test environment. + """ + + def __init__(self, test_context): + super(NetworkDegradeTest, self).__init__(test_context) + self.zk = ZookeeperService(test_context, num_nodes=3) + self.trogdor = TrogdorService(context=self.test_context, client_services=[self.zk]) + + def setUp(self): + self.zk.start() + self.trogdor.start() + + def teardown(self): + self.trogdor.stop() + self.zk.stop() + + @cluster(num_nodes=5) + @parametrize(task_name="latency-100", device_name="eth0", latency_ms=50, rate_limit_kbit=0) + @parametrize(task_name="latency-100-rate-1000", device_name="eth0", latency_ms=50, rate_limit_kbit=1000) + def test_latency(self, task_name, device_name, latency_ms, rate_limit_kbit): + spec = DegradedNetworkFaultSpec(0, 10000) + for node in self.zk.nodes: + spec.add_node_spec(node.name, device_name, latency_ms, rate_limit_kbit) + + latency = self.trogdor.create_task(task_name, spec) + + zk0 = self.zk.nodes[0] + zk1 = self.zk.nodes[1] + + # Capture the ping times from the ping stdout + # 64 bytes from ducker01 (172.24.0.2): icmp_seq=1 ttl=64 time=0.325 ms + r = re.compile(r".*time=(?P