Browse Source

MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
pull/6451/head
Stanislav Kozlovski 6 years ago committed by Colin Patrick McCabe
parent
commit
f20f3c1a97
  1. 29
      tests/spec/connection_stress_test.json
  2. 38
      tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java

29
tests/spec/connection_stress_test.json

@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An example task specification for running a connection stress test in Trogdor.
// See TROGDOR.md for details.
//
{
"class": "org.apache.kafka.trogdor.workload.ConnectionStressSpec",
"durationMs": 60000,
"clientNode": "node0",
"bootstrapServers": "localhost:9092",
"targetConnectionsPerSec": 100,
"numThreads": 10,
"action": "CONNECT"
}

38
tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java

@ -99,12 +99,14 @@ public class ConnectionStressWorker implements TaskWorker { @@ -99,12 +99,14 @@ public class ConnectionStressWorker implements TaskWorker {
log.info("{}: Activating ConnectionStressWorker with {}", id, spec);
this.doneFuture = doneFuture;
this.status = status;
this.totalConnections = 0;
this.totalFailedConnections = 0;
this.startTimeMs = TIME.milliseconds();
synchronized (ConnectionStressWorker.this) {
this.totalConnections = 0;
this.totalFailedConnections = 0;
this.nextReportTime = 0;
this.startTimeMs = TIME.milliseconds();
}
this.throttle = new ConnectStressThrottle(WorkerUtils.
perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
this.nextReportTime = 0;
this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
for (int i = 0; i < spec.numThreads(); i++) {
@ -112,6 +114,17 @@ public class ConnectionStressWorker implements TaskWorker { @@ -112,6 +114,17 @@ public class ConnectionStressWorker implements TaskWorker {
}
}
/**
* Update the worker's status and next status report time.
*/
private synchronized void updateStatus(long lastTimeMs) {
status.update(JsonUtil.JSON_SERDE.valueToTree(
new StatusData(totalConnections,
totalFailedConnections,
(totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
}
private static class ConnectStressThrottle extends Throttle {
ConnectStressThrottle(int maxPerPeriod) {
super(maxPerPeriod, THROTTLE_PERIOD_MS);
@ -130,10 +143,7 @@ public class ConnectionStressWorker implements TaskWorker { @@ -130,10 +143,7 @@ public class ConnectionStressWorker implements TaskWorker {
conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
while (true) {
if (doneFuture.isDone()) {
break;
}
while (!doneFuture.isDone()) {
throttle.increment();
long lastTimeMs = throttle.lastTimeMs();
boolean success = false;
@ -150,13 +160,8 @@ public class ConnectionStressWorker implements TaskWorker { @@ -150,13 +160,8 @@ public class ConnectionStressWorker implements TaskWorker {
if (!success) {
totalFailedConnections++;
}
if (lastTimeMs > nextReportTime) {
status.update(JsonUtil.JSON_SERDE.valueToTree(
new StatusData(totalConnections,
totalFailedConnections,
(totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
}
if (lastTimeMs > nextReportTime)
updateStatus(lastTimeMs);
}
}
} catch (Exception e) {
@ -165,7 +170,7 @@ public class ConnectionStressWorker implements TaskWorker { @@ -165,7 +170,7 @@ public class ConnectionStressWorker implements TaskWorker {
}
private boolean attemptConnection(AdminClientConfig conf,
ManualMetadataUpdater updater) throws Exception {
ManualMetadataUpdater updater) {
try {
List<Node> nodes = updater.fetchNodes();
Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
@ -250,6 +255,7 @@ public class ConnectionStressWorker implements TaskWorker { @@ -250,6 +255,7 @@ public class ConnectionStressWorker implements TaskWorker {
doneFuture.complete("");
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(1, TimeUnit.DAYS);
updateStatus(throttle.lastTimeMs());
this.workerExecutor = null;
this.status = null;
}

Loading…
Cancel
Save