Browse Source

KAFKA-8981 Add rate limiting to NetworkDegradeSpec (#7446)

* Add rate limiting to tc

* Feedback from PR

* Add a sanity test for tc

* Add iperf to vagrant scripts

* Dynamically determine the network interface

* Add some temp code for testing on AWS

* Temp: use hostname instead of external IP

* Temp: more AWS debugging

* More AWS WIP

* More AWS temp

* Lower latency some

* AWS wip

* Trying this again now that ping should work

* Add cluster decorator to tests

* Fix broken import

* Fix device name

* Fix decorator arg

* Remove errant import

* Increase timeouts

* Fix tbf command, relax assertion on latency test

* Fix log line

* Final bit of cleanup

* Newline

* Revert Trogdor retry count

* PR feedback

* More PR feedback

* Feedback from PR

* Remove unused argument
pull/7712/head
David Arthur 5 years ago committed by GitHub
parent
commit
d04699486d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      tests/docker/Dockerfile
  2. 19
      tests/kafkatest/services/trogdor/degraded_network_fault_spec.py
  3. 36
      tests/kafkatest/services/trogdor/trogdor.py
  4. 138
      tests/kafkatest/tests/core/network_degrade_test.py
  5. 5
      tests/kafkatest/tests/core/round_trip_fault_test.py
  6. 2
      tests/kafkatest/utils/remote_account.py
  7. 21
      tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java
  8. 81
      tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java
  9. 4
      vagrant/aws/aws-init.sh
  10. 3
      vagrant/base.sh

2
tests/docker/Dockerfile

@ -32,7 +32,7 @@ ARG ducker_creator=default @@ -32,7 +32,7 @@ ARG ducker_creator=default
LABEL ducker.creator=$ducker_creator
# Update Linux and install necessary utilities.
RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev && apt-get -y clean
RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
RUN python -m pip install -U pip==9.0.3;
RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.6

19
tests/kafkatest/services/trogdor/degraded_network_fault_spec.py

@ -23,15 +23,26 @@ class DegradedNetworkFaultSpec(TaskSpec): @@ -23,15 +23,26 @@ class DegradedNetworkFaultSpec(TaskSpec):
Degrades the network so that traffic on a subset of nodes has higher latency
"""
def __init__(self, start_ms, duration_ms, node_specs):
def __init__(self, start_ms, duration_ms):
"""
Create a new NetworkDegradeFaultSpec.
:param start_ms: The start time, as described in task_spec.py
:param duration_ms: The duration in milliseconds.
:param node_latencies: A dict of node name to desired latency
:param network_device: The name of the network device
"""
super(DegradedNetworkFaultSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.fault.DegradedNetworkFaultSpec"
self.message["nodeSpecs"] = node_specs
self.message["nodeSpecs"] = {}
def add_node_spec(self, node, networkDevice, latencyMs=0, rateLimitKbit=0):
"""
Add a node spec to this fault spec
:param node: The node name which is to be degraded
:param networkDevice: The network device name (e.g., eth0) to apply the degradation to
:param latencyMs: Optional. How much latency to add to each packet
:param rateLimitKbit: Optional. Maximum throughput in kilobits per second to allow
:return:
"""
self.message["nodeSpecs"][node] = {
"rateLimitKbit": rateLimitKbit, "latencyMs": latencyMs, "networkDevice": networkDevice
}

36
tests/kafkatest/services/trogdor/trogdor.py

@ -300,6 +300,16 @@ class TrogdorTask(object): @@ -300,6 +300,16 @@ class TrogdorTask(object):
self.id = id
self.trogdor = trogdor
def task_state_or_error(self):
task_state = self.trogdor.tasks()["tasks"][self.id]
if task_state is None:
raise RuntimeError("Coordinator did not know about %s." % self.id)
error = task_state.get("error")
if error is None or error == "":
return task_state["state"], None
else:
return None, error
def done(self):
"""
Check if this task is done.
@ -308,13 +318,25 @@ class TrogdorTask(object): @@ -308,13 +318,25 @@ class TrogdorTask(object):
:returns: True if the task is in DONE_STATE;
False if it is in a different state.
"""
task_state = self.trogdor.tasks()["tasks"][self.id]
if task_state is None:
raise RuntimeError("Coordinator did not know about %s." % self.id)
error = task_state.get("error")
if error is None or error == "":
return task_state["state"] == TrogdorTask.DONE_STATE
raise RuntimeError("Failed to gracefully stop %s: got task error: %s" % (self.id, error))
(task_state, error) = self.task_state_or_error()
if task_state is not None:
return task_state == TrogdorTask.DONE_STATE
else:
raise RuntimeError("Failed to gracefully stop %s: got task error: %s" % (self.id, error))
def running(self):
"""
Check if this task is running.
:raises RuntimeError: If the task encountered an error.
:returns: True if the task is in RUNNING_STATE;
False if it is in a different state.
"""
(task_state, error) = self.task_state_or_error()
if task_state is not None:
return task_state == TrogdorTask.RUNNING_STATE
else:
raise RuntimeError("Failed to start %s: got task error: %s" % (self.id, error))
def stop(self):
"""

138
tests/kafkatest/tests/core/network_degrade_test.py

@ -0,0 +1,138 @@ @@ -0,0 +1,138 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService
class NetworkDegradeTest(Test):
"""
These tests ensure that the network degrade Trogdor specs (which use "tc") are working as expected in whatever
environment the system tests may be running in. The linux tools "ping" and "iperf" are used for validation
and need to be available along with "tc" in the test environment.
"""
def __init__(self, test_context):
super(NetworkDegradeTest, self).__init__(test_context)
self.zk = ZookeeperService(test_context, num_nodes=3)
self.trogdor = TrogdorService(context=self.test_context, client_services=[self.zk])
def setUp(self):
self.zk.start()
self.trogdor.start()
def teardown(self):
self.trogdor.stop()
self.zk.stop()
@cluster(num_nodes=5)
@parametrize(task_name="latency-100", device_name="eth0", latency_ms=50, rate_limit_kbit=0)
@parametrize(task_name="latency-100-rate-1000", device_name="eth0", latency_ms=50, rate_limit_kbit=1000)
def test_latency(self, task_name, device_name, latency_ms, rate_limit_kbit):
spec = DegradedNetworkFaultSpec(0, 10000)
for node in self.zk.nodes:
spec.add_node_spec(node.name, device_name, latency_ms, rate_limit_kbit)
latency = self.trogdor.create_task(task_name, spec)
zk0 = self.zk.nodes[0]
zk1 = self.zk.nodes[1]
# Capture the ping times from the ping stdout
# 64 bytes from ducker01 (172.24.0.2): icmp_seq=1 ttl=64 time=0.325 ms
r = re.compile(r".*time=(?P<time>[\d.]+)\sms.*")
times = []
for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % zk1.account.hostname):
self.logger.debug("Ping output: %s" % line)
m = r.match(line)
if m is not None and m.group("time"):
times.append(float(m.group("time")))
self.logger.info("Parsed ping time of %d" % float(m.group("time")))
self.logger.debug("Captured ping times: %s" % times)
# We expect to see some low ping times (before and after the task runs) as well as high ping times
# (during the task). For the high time, it's twice the configured latency since both links apply the
# rule, 80% for a little variance buffer
high_time_ms = 0.8 * 2 * latency_ms
low_time_ms = 10
slow_times = [t for t in times if t > high_time_ms]
fast_times = [t for t in times if t < low_time_ms]
latency.stop()
latency.wait_for_done()
# We captured 20 ping times. Assert that at least 5 were "fast" and 5 were "slow"
assert len(slow_times) > 5, "Expected to see more slow ping times (lower than %d)" % low_time_ms
assert len(fast_times) > 5, "Expected to see more fast ping times (higher than %d)" % high_time_ms
@cluster(num_nodes=5)
@parametrize(task_name="rate-1000", device_name="eth0", latency_ms=0, rate_limit_kbit=1000000)
@parametrize(task_name="rate-1000-latency-50", device_name="eth0", latency_ms=50, rate_limit_kbit=1000000)
def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit):
zk0 = self.zk.nodes[0]
zk1 = self.zk.nodes[1]
spec = DegradedNetworkFaultSpec(0, 60000)
spec.add_node_spec(zk0.name, device_name, latency_ms, rate_limit_kbit)
# start the task and wait
rate_limit = self.trogdor.create_task(task_name, spec)
wait_until(lambda: rate_limit.running(),
timeout_sec=10,
err_msg="%s failed to start within 10 seconds." % rate_limit)
# Run iperf server on zk1, iperf client on zk0
iperf_server = zk1.account.ssh_capture("iperf -s")
# Capture the measured kbps between the two nodes.
# [ 3] 0.0- 1.0 sec 2952576 KBytes 24187503 Kbits/sec
r = re.compile(r"^.*\s(?P<rate>[\d.]+)\sKbits/sec$")
measured_rates = []
for line in zk0.account.ssh_capture("iperf -i 1 -t 20 -f k -c %s" % zk1.account.hostname):
self.logger.info("iperf output %s" % line)
m = r.match(line)
if m is not None:
measured_rate = float(m.group("rate"))
measured_rates.append(measured_rate)
self.logger.info("Parsed rate of %d kbit/s from iperf" % measured_rate)
# kill iperf server and consume the stdout to ensure clean exit
zk1.account.kill_process("iperf")
for _ in iperf_server:
continue
rate_limit.stop()
rate_limit.wait_for_done()
self.logger.info("Measured rates: %s" % measured_rates)
# We expect to see measured rates within an order of magnitude of our target rate
low_kbps = rate_limit_kbit / 10
high_kbps = rate_limit_kbit * 10
acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps]
msg = "Expected most of the measured rates to be within an order of magnitude of target %d." % rate_limit_kbit
msg += " This means `tc` did not limit the bandwidth as expected."
assert len(acceptable_rates) > 5, msg

5
tests/kafkatest/tests/core/round_trip_fault_test.py

@ -97,10 +97,9 @@ class RoundTripFaultTest(Test): @@ -97,10 +97,9 @@ class RoundTripFaultTest(Test):
def test_produce_consume_with_latency(self):
workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
time.sleep(2)
node_specs = {}
spec = DegradedNetworkFaultSpec(0, 60000)
for node in self.kafka.nodes + self.zk.nodes:
node_specs[node.name] = {"latencyMs": 500, "networkDevice": "eth0"}
spec = DegradedNetworkFaultSpec(0, 60000, node_specs)
spec.add_node_spec(node.name, "eth0", latencyMs=100, rateLimitKbit=3000)
slow1 = self.trogdor.create_task("slow1", spec)
workload1.wait_for_done(timeout_sec=600)
slow1.stop()

2
tests/kafkatest/utils/remote_account.py

@ -36,4 +36,4 @@ def line_count(node, file): @@ -36,4 +36,4 @@ def line_count(node, file):
if len(out) != 1:
raise Exception("Expected single line of output from wc -l")
return int(out[0].strip().split(" ")[0])
return int(out[0].strip().split(" ")[0])

21
tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultSpec.java

@ -31,12 +31,15 @@ public class DegradedNetworkFaultSpec extends TaskSpec { @@ -31,12 +31,15 @@ public class DegradedNetworkFaultSpec extends TaskSpec {
public static class NodeDegradeSpec {
private final String networkDevice;
private final int latencyMs;
private final int rateLimitKbit;
public NodeDegradeSpec(
@JsonProperty("networkDevice") String networkDevice,
@JsonProperty("latencyMs") int latencyMs) {
@JsonProperty("latencyMs") Integer latencyMs,
@JsonProperty("rateLimitKbit") Integer rateLimitKbit) {
this.networkDevice = networkDevice == null ? "" : networkDevice;
this.latencyMs = latencyMs;
this.latencyMs = latencyMs == null ? 0 : latencyMs;
this.rateLimitKbit = rateLimitKbit == null ? 0 : rateLimitKbit;
}
@JsonProperty("networkDevice")
@ -48,6 +51,20 @@ public class DegradedNetworkFaultSpec extends TaskSpec { @@ -48,6 +51,20 @@ public class DegradedNetworkFaultSpec extends TaskSpec {
public int latencyMs() {
return latencyMs;
}
@JsonProperty("rateLimitKbit")
public int rateLimitKbit() {
return rateLimitKbit;
}
@Override
public String toString() {
return "NodeDegradeSpec{" +
"networkDevice='" + networkDevice + '\'' +
", latencyMs=" + latencyMs +
", rateLimitKbit=" + rateLimitKbit +
'}';
}
}
private final Map<String, NodeDegradeSpec> nodeSpecs;

81
tools/src/main/java/org/apache/kafka/trogdor/fault/DegradedNetworkFaultWorker.java

@ -28,13 +28,17 @@ import org.slf4j.LoggerFactory; @@ -28,13 +28,17 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* Uses the linux utility <pre>tc</pre> (traffic controller) to simulate latency on a specified network device
* Uses the linux utility <pre>tc</pre> (traffic controller) to degrade performance on a specified network device
*/
public class DegradedNetworkFaultWorker implements TaskWorker {
@ -58,10 +62,10 @@ public class DegradedNetworkFaultWorker implements TaskWorker { @@ -58,10 +62,10 @@ public class DegradedNetworkFaultWorker implements TaskWorker {
DegradedNetworkFaultSpec.NodeDegradeSpec nodeSpec = nodeSpecs.get(curNode.name());
if (nodeSpec != null) {
for (String device : devicesForSpec(nodeSpec)) {
if (nodeSpec.latencyMs() < 0) {
throw new RuntimeException("Expected a positive value for latencyMs, but got " + nodeSpec.latencyMs());
if (nodeSpec.latencyMs() < 0 || nodeSpec.rateLimitKbit() < 0) {
throw new RuntimeException("Expected non-negative values for latencyMs and rateLimitKbit, but got " + nodeSpec);
} else {
enableTrafficControl(platform, device, nodeSpec.latencyMs());
enableTrafficControl(platform, device, nodeSpec.latencyMs(), nodeSpec.rateLimitKbit());
}
}
}
@ -96,14 +100,71 @@ public class DegradedNetworkFaultWorker implements TaskWorker { @@ -96,14 +100,71 @@ public class DegradedNetworkFaultWorker implements TaskWorker {
return devices;
}
private void enableTrafficControl(Platform platform, String networkDevice, int delayMs) throws IOException {
int deviationMs = Math.max(1, (int) Math.sqrt(delayMs));
platform.runCommand(new String[] {
"sudo", "tc", "qdisc", "add", "dev", networkDevice, "root", "netem",
"delay", String.format("%dms", delayMs), String.format("%dms", deviationMs), "distribution", "normal"
});
/**
* Constructs the appropriate "tc" commands to apply latency and rate limiting, if they are non zero.
*/
private void enableTrafficControl(Platform platform, String networkDevice, int delayMs, int rateLimitKbps) throws IOException {
if (delayMs > 0) {
int deviationMs = Math.max(1, (int) Math.sqrt(delayMs));
List<String> delay = new ArrayList<>();
rootHandler(networkDevice, delay::add);
netemDelay(delayMs, deviationMs, delay::add);
platform.runCommand(delay.toArray(new String[]{}));
if (rateLimitKbps > 0) {
List<String> rate = new ArrayList<>();
childHandler(networkDevice, rate::add);
tbfRate(rateLimitKbps, rate::add);
platform.runCommand(rate.toArray(new String[]{}));
}
} else if (rateLimitKbps > 0) {
List<String> rate = new ArrayList<>();
rootHandler(networkDevice, rate::add);
tbfRate(rateLimitKbps, rate::add);
platform.runCommand(rate.toArray(new String[]{}));
} else {
log.warn("Not applying any rate limiting or latency");
}
}
/**
* Construct the first part of a "tc" command to define a qdisc root handler for the given network interface
*/
private void rootHandler(String networkDevice, Consumer<String> consumer) {
Stream.of("sudo", "tc", "qdisc", "add", "dev", networkDevice, "root", "handle", "1:0").forEach(consumer);
}
/**
* Construct the first part of a "tc" command to define a qdisc child handler for the given interface. This can
* only be used if a root handler has been appropriately defined first (as in {@link #rootHandler}).
*/
private void childHandler(String networkDevice, Consumer<String> consumer) {
Stream.of("sudo", "tc", "qdisc", "add", "dev", networkDevice, "parent", "1:1", "handle", "10:").forEach(consumer);
}
/**
* Construct the second part of a "tc" command that defines a netem (Network Emulator) filter that will apply some
* amount of latency with a small amount of deviation. The distribution of the latency deviation follows a so-called
* Pareto-normal distribution. This is the formal name for the 80/20 rule, which might better represent real-world
* patterns.
*/
private void netemDelay(int delayMs, int deviationMs, Consumer<String> consumer) {
Stream.of("netem", "delay", String.format("%dms", delayMs), String.format("%dms", deviationMs),
"distribution", "paretonormal").forEach(consumer);
}
/**
* Construct the second part of a "tc" command that defines a tbf (token buffer filter) that will rate limit the
* packets going through a qdisc.
*/
private void tbfRate(int rateLimitKbit, Consumer<String> consumer) {
Stream.of("tbf", "rate", String.format("%dkbit", rateLimitKbit), "burst", "1mbit", "latency", "500ms").forEach(consumer);
}
/**
* Delete any previously defined qdisc for the given network interface.
* @throws IOException
*/
private void disableTrafficControl(Platform platform, String networkDevice) throws IOException {
platform.runCommand(new String[] {
"sudo", "tc", "qdisc", "del", "dev", networkDevice, "root"

4
vagrant/aws/aws-init.sh

@ -25,7 +25,9 @@ sudo apt-get install -y \ @@ -25,7 +25,9 @@ sudo apt-get install -y \
ruby-dev \
zlib1g-dev \
realpath \
python-setuptools
python-setuptools \
iperf \
traceroute
base_dir=`dirname $0`/../..

3
vagrant/base.sh

@ -107,6 +107,9 @@ popd @@ -107,6 +107,9 @@ popd
popd
popd
# Install iperf
apt-get install -y iperf traceroute
# Test multiple Kafka versions
# We want to use the latest Scala version per Kafka version
# Previously we could not pull in Scala 2.12 builds, because Scala 2.12 requires Java 8 and we were running the system

Loading…
Cancel
Save