Browse Source

KAFKA-7793: Improve the Trogdor command line. (#6133)

* Allow the Trogdor agent to be started in "exec mode", where it simply
runs a single task and exits after it is complete.

* For AgentClient and CoordinatorClient, allow the user to pass the path
to a file containing JSON, instead of specifying the JSON object in the
command-line text itself.  This means that we can get rid of the bash
scripts whose only function was to load task specs into a bash string
and run a Trogdor command.

* Print dates and times in a human-readable way, rather than as numbers
of milliseconds.

* When listing tasks or workers, output human-readable tables of
information.

* Allow the user to filter on task ID name, task ID pattern, or task
state.

* Support a --json flag to provide raw JSON output if desired.

Reviewed-by: David Arthur <mumrah@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
pull/6196/head
Colin Patrick McCabe 6 years ago committed by GitHub
parent
commit
a79d6dcdb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 85
      TROGDOR.md
  2. 3
      checkstyle/suppressions.xml
  3. 5
      tests/.gitignore
  4. 39
      tests/bin/trogdor-run-consume-bench.sh
  5. 47
      tests/bin/trogdor-run-produce-bench.sh
  6. 42
      tests/bin/trogdor-run-round-trip.sh
  7. 51
      tests/bin/trogdor-run-transactional-produce-bench.sh
  8. 34
      tests/spec/round_trip.json
  9. 31
      tests/spec/simple_consume_bench_spec.json
  10. 40
      tests/spec/simple_produce_bench.json
  11. 44
      tests/spec/transactional-produce-bench.json
  12. 105
      tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
  13. 204
      tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
  14. 18
      tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
  15. 41
      tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
  16. 114
      tools/src/main/java/org/apache/kafka/trogdor/common/StringFormatter.java
  17. 371
      tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
  18. 5
      tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
  19. 5
      tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
  20. 5
      tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
  21. 2
      tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
  22. 5
      tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
  23. 45
      tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
  24. 79
      tools/src/test/java/org/apache/kafka/trogdor/common/JsonUtilTest.java
  25. 68
      tools/src/test/java/org/apache/kafka/trogdor/common/StringFormatterTest.java
  26. 83
      tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorClientTest.java

85
TROGDOR.md

@ -35,61 +35,26 @@ Let's confirm that all of the daemons are running: @@ -35,61 +35,26 @@ Let's confirm that all of the daemons are running:
115420 Kafka
115694 Agent
Now, we can submit a test job to Trogdor. Here's an example of a short bash script which makes it easier.
> ./tests/bin/trogdor-run-produce-bench.sh
Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = produce_bench_21634
To get the test results, we run --show-tasks:
./bin/trogdor.sh client --show-tasks localhost:8889
Got coordinator tasks: {
"tasks" : {
"produce_bench_21634" : {
"state" : "DONE",
"spec" : {
"class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"startMs" : 0,
"durationMs" : 10000000,
"producerNode" : "node0",
"bootstrapServers" : "localhost:9092",
"targetMessagesPerSec" : 10000,
"maxMessages" : 50000,
"keyGenerator" : {
"type" : "sequential",
"size" : 4,
"startOffset" : 0
},
"valueGenerator" : {
"type" : "constant",
"size" : 512,
"value" : "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
},
"activeTopics" : {
"foo[1-3]" : {
"numPartitions" : 10,
"replicationFactor" : 1
}
},
"inactiveTopics" : {
"foo[4-5]" : {
"numPartitions" : 10,
"replicationFactor" : 1
}
}
},
"startedMs" : 1541435949784,
"doneMs" : 1541435955803,
"cancelled" : false,
"status" : {
Now, we can submit a test job to Trogdor.
> ./bin/trogdor.sh client createTask -t localhost:8889 -i produce0 --spec ./tests/spec/simple_produce_bench.json
Sent CreateTaskRequest for task produce0.
We can run showTask to see what the task's status is:
> ./bin/trogdor.sh client showTask -t localhost:8889 -i produce0
Task bar of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2019-01-09T20:38:22.039-08:00 after 6s
To see the results, we use showTask with --show-status:
> ./bin/trogdor.sh client showTask -t localhost:8889 -i produce0 --show-status
Task bar of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2019-01-09T20:38:22.039-08:00 after 6s
Status: {
"totalSent" : 50000,
"averageLatencyMs" : 11.0293,
"p50LatencyMs" : 9,
"p95LatencyMs" : 27,
"p99LatencyMs" : 39
}
}
}
"averageLatencyMs" : 17.83388,
"p50LatencyMs" : 12,
"p95LatencyMs" : 75,
"p99LatencyMs" : 96,
"transactionsCommitted" : 0
}
Trogdor Architecture
@ -157,3 +122,15 @@ ProcessStopFault stops a process by sending it a SIGSTOP signal. When the fault @@ -157,3 +122,15 @@ ProcessStopFault stops a process by sending it a SIGSTOP signal. When the fault
### NetworkPartitionFault
NetworkPartitionFault sets up an artificial network partition between one or more sets of nodes. Currently, this is implemented using iptables. The iptables rules are set up on the outbound traffic from the affected nodes. Therefore, the affected nodes should still be reachable from outside the cluster.
Exec Mode
========================================
Sometimes, you just want to run a test quickly on a single node. In this case, you can use "exec mode." This mode allows you to run a single Trogdor Agent without a Coordinator.
When using exec mode, you must pass in a Task specification to use. The Agent will try to start this task.
For example:
> ./bin/trogdor.sh agent -n node0 -c ./config/trogdor.conf --exec ./tests/spec/simple_produce_bench.json
When using exec mode, the Agent will exit once the task is complete.

3
checkstyle/suppressions.xml

@ -59,6 +59,9 @@ @@ -59,6 +59,9 @@
<suppress checks="NPathComplexity"
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/>
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>

5
tests/.gitignore vendored

@ -1,12 +1,7 @@ @@ -1,12 +1,7 @@
Vagrantfile.local
.idea/
*.pyc
*.ipynb
.DS_Store
.ducktape
results/
*.json

39
tests/bin/trogdor-run-consume-bench.sh

@ -1,39 +0,0 @@ @@ -1,39 +0,0 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
COORDINATOR_ENDPOINT="localhost:8889"
TASK_ID="consume_bench_$RANDOM"
TASK_SPEC=$(
cat <<EOF
{
"id": "$TASK_ID",
"spec": {
"class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
"durationMs": 10000000,
"consumerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 1000,
"threadsPerWorker": 5,
"consumerGroup": "cg",
"maxMessages": 10000,
"activeTopics": ["foo[1-3]"]
}
}
EOF
)
./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
echo "\$TASK_ID = $TASK_ID"

47
tests/bin/trogdor-run-produce-bench.sh

@ -1,47 +0,0 @@ @@ -1,47 +0,0 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
COORDINATOR_ENDPOINT="localhost:8889"
TASK_ID="produce_bench_$RANDOM"
TASK_SPEC=$(
cat <<EOF
{
"id": "$TASK_ID",
"spec": {
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
"activeTopics": {
"foo[1-3]": {
"numPartitions": 10,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 10,
"replicationFactor": 1
}
}
}
}
EOF
)
./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
echo "\$TASK_ID = $TASK_ID"

42
tests/bin/trogdor-run-round-trip.sh

@ -1,42 +0,0 @@ @@ -1,42 +0,0 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
COORDINATOR_ENDPOINT="localhost:8889"
TASK_ID="round_trip_$RANDOM"
TASK_SPEC=$(
cat <<EOF
{
"id": "$TASK_ID",
"spec": {
"class": "org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec",
"durationMs": 10000000,
"clientNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 1000,
"maxMessages": 100,
"activeTopics": {
"${TASK_ID}_topic[0-1]": {
"numPartitions": 2,
"replicationFactor": 1
}
}
}
}
EOF
)
./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
echo "\$TASK_ID = $TASK_ID"

51
tests/bin/trogdor-run-transactional-produce-bench.sh

@ -1,51 +0,0 @@ @@ -1,51 +0,0 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
COORDINATOR_ENDPOINT="localhost:8889"
TASK_ID="produce_bench_$RANDOM"
TASK_SPEC=$(
cat <<EOF
{
"id": "$TASK_ID",
"spec": {
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 100,
"maxMessages": 500,
"transactionGenerator" : {
"type" : "uniform",
"messagesPerTransaction" : 50
},
"activeTopics": {
"foo[1-3]": {
"numPartitions": 3,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 3,
"replicationFactor": 1
}
}
}
}
EOF
)
./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
echo "\$TASK_ID = $TASK_ID"

34
tests/spec/round_trip.json

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An example task specification for running a round trip test in Trogdor.
// See TROGDOR.md for details.
//
{
"class": "org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec",
"durationMs": 10000000,
"clientNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 1000,
"maxMessages": 100,
"activeTopics": {
"round_trip_topic[0-1]": {
"numPartitions": 2,
"replicationFactor": 1
}
}
}

31
tests/spec/simple_consume_bench_spec.json

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An example task specification for running a consumer benchmark in Trogdor.
// See TROGDOR.md for details.
//
{
"class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
"durationMs": 10000000,
"consumerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 1000,
"threadsPerWorker": 5,
"consumerGroup": "cg",
"maxMessages": 10000,
"activeTopics": [ "foo[1-3]" ]
}

40
tests/spec/simple_produce_bench.json

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An example task specification for running a producer benchmark in Trogdor.
// See TROGDOR.md for details.
//
{
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
"activeTopics": {
"foo[1-3]": {
"numPartitions": 10,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 10,
"replicationFactor": 1
}
}
}

44
tests/spec/transactional-produce-bench.json

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An example task specification for running a transactional producer benchmark
in Trogdor. See TROGDOR.md for details.
//
{
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 100,
"maxMessages": 500,
"transactionGenerator" : {
"type" : "uniform",
"messagesPerTransaction" : 50
},
"activeTopics": {
"foo[1-3]": {
"numPartitions": 3,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 3,
"replicationFactor": 1
}
}
}

105
tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java

@ -17,13 +17,18 @@ @@ -17,13 +17,18 @@
package org.apache.kafka.trogdor.agent;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
@ -31,10 +36,15 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest; @@ -31,10 +36,15 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintStream;
import java.util.Set;
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
@ -45,8 +55,26 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -45,8 +55,26 @@ import static net.sourceforge.argparse4j.impl.Arguments.store;
public final class Agent {
private static final Logger log = LoggerFactory.getLogger(Agent.class);
/**
* The default Agent port.
*/
public static final int DEFAULT_PORT = 8888;
/**
* The workerId to use in exec mode.
*/
private static final long EXEC_WORKER_ID = 1;
/**
* The taskId to use in exec mode.
*/
private static final String EXEC_TASK_ID = "task0";
/**
* The platform object to use for this agent.
*/
private final Platform platform;
/**
* The time at which this server was started.
*/
@ -74,6 +102,7 @@ public final class Agent { @@ -74,6 +102,7 @@ public final class Agent {
*/
public Agent(Platform platform, Scheduler scheduler,
JsonRestServer restServer, AgentRestResource resource) {
this.platform = platform;
this.time = scheduler.time();
this.serverStartMs = time.milliseconds();
this.workerManager = new WorkerManager(platform, scheduler);
@ -115,6 +144,63 @@ public final class Agent { @@ -115,6 +144,63 @@ public final class Agent {
workerManager.stopWorker(req.workerId(), true);
}
/**
* Rebase the task spec time so that it is not earlier than the current time.
* This is only needed for tasks passed in with --exec. Normally, the
* controller rebases the task spec time.
*/
TaskSpec rebaseTaskSpecTime(TaskSpec spec) throws Exception {
ObjectNode node = JsonUtil.JSON_SERDE.valueToTree(spec);
node.set("startMs", new LongNode(Math.max(time.milliseconds(), spec.startMs())));
return JsonUtil.JSON_SERDE.treeToValue(node, TaskSpec.class);
}
/**
* Start a task on the agent, and block until it completes.
*
* @param spec The task specifiction.
* @param out The output stream to print to.
*
* @return True if the task run successfully; false otherwise.
*/
boolean exec(TaskSpec spec, PrintStream out) throws Exception {
TaskController controller = null;
try {
controller = spec.newController(EXEC_TASK_ID);
} catch (Exception e) {
out.println("Unable to create the task controller.");
e.printStackTrace(out);
return false;
}
Set<String> nodes = controller.targetNodes(platform.topology());
if (!nodes.contains(platform.curNode().name())) {
out.println("This task is not configured to run on this node. It runs on node(s): " +
Utils.join(nodes, ", ") + ", whereas this node is " +
platform.curNode().name());
return false;
}
KafkaFuture<String> future = null;
try {
future = workerManager.createWorker(EXEC_WORKER_ID, EXEC_TASK_ID, spec);
} catch (Throwable e) {
out.println("createWorker failed");
e.printStackTrace(out);
return false;
}
out.println("Waiting for completion of task:" + JsonUtil.toPrettyJsonString(spec));
String error = future.get();
if (error == null || error.isEmpty()) {
out.println("Task succeeded with status " +
JsonUtil.toPrettyJsonString(workerManager.workerStates().get(EXEC_WORKER_ID).status()));
return true;
} else {
out.println("Task failed with status " +
JsonUtil.toPrettyJsonString(workerManager.workerStates().get(EXEC_WORKER_ID).status()) +
" and error " + error);
return false;
}
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-agent")
@ -134,6 +220,12 @@ public final class Agent { @@ -134,6 +220,12 @@ public final class Agent {
.dest("node_name")
.metavar("NODE_NAME")
.help("The name of this node.");
parser.addArgument("--exec", "-e")
.action(store())
.type(String.class)
.dest("task_spec")
.metavar("TASK_SPEC")
.help("Execute a single task spec and then exit. The argument is the task spec to load when starting up, or a path to it.");
Namespace res = null;
try {
res = parser.parseArgs(args);
@ -148,6 +240,7 @@ public final class Agent { @@ -148,6 +240,7 @@ public final class Agent {
}
String configPath = res.getString("config");
String nodeName = res.getString("node_name");
String taskSpec = res.getString("task_spec");
Platform platform = Platform.Config.parse(nodeName, configPath);
JsonRestServer restServer =
@ -165,6 +258,18 @@ public final class Agent { @@ -165,6 +258,18 @@ public final class Agent {
log.error("Got exception while running agent shutdown hook.", e);
}
}));
if (taskSpec != null) {
TaskSpec spec = null;
try {
spec = JsonUtil.objectFromCommandLineArgument(taskSpec, TaskSpec.class);
} catch (Exception e) {
System.out.println("Unable to parse the supplied task spec.");
e.printStackTrace();
Exit.exit(1);
}
TaskSpec effectiveSpec = agent.rebaseTaskSpecTime(spec);
Exit.exit(agent.exec(effectiveSpec, System.out) ? 0 : 1);
}
agent.waitForShutdown();
}
};

204
tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java

@ -20,11 +20,12 @@ package org.apache.kafka.trogdor.agent; @@ -20,11 +20,12 @@ package org.apache.kafka.trogdor.agent;
import com.fasterxml.jackson.core.type.TypeReference;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.StringFormatter;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
@ -32,14 +33,25 @@ import org.apache.kafka.trogdor.rest.Empty; @@ -32,14 +33,25 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.UriBuilder;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
/**
* A client for the Trogdor agent.
@ -158,95 +170,157 @@ public class AgentClient { @@ -158,95 +170,157 @@ public class AgentClient {
resp.body();
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-agent-client")
.defaultHelp(true)
.description("The Trogdor fault injection agent client.");
parser.addArgument("target")
private static void addTargetArgument(ArgumentParser parser) {
parser.addArgument("--target", "-t")
.action(store())
.required(true)
.type(String.class)
.dest("target")
.metavar("TARGET")
.help("A colon-separated host and port pair. For example, example.com:8888");
MutuallyExclusiveGroup actions = parser.addMutuallyExclusiveGroup();
actions.addArgument("--status")
}
private static void addJsonArgument(ArgumentParser parser) {
parser.addArgument("--json")
.action(storeTrue())
.type(Boolean.class)
.dest("status")
.help("Get agent status.");
actions.addArgument("--uptime")
.dest("json")
.metavar("JSON")
.help("Show the full response as JSON.");
}
private static void addWorkerIdArgument(ArgumentParser parser, String help) {
parser.addArgument("--workerId")
.action(storeTrue())
.type(Boolean.class)
.dest("uptime")
.help("Get agent uptime.");
actions.addArgument("--create-worker")
.action(store())
.type(String.class)
.dest("create_worker")
.metavar("SPEC_JSON")
.help("Create a new fault.");
actions.addArgument("--stop-worker")
.action(store())
.type(Long.class)
.dest("stop_worker")
.dest("workerId")
.metavar("WORKER_ID")
.help("Stop a worker ID.");
actions.addArgument("--destroy-worker")
.action(store())
.type(Long.class)
.dest("destroy_worker")
.metavar("WORKER_ID")
.help("Destroy a worker ID.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
.dest("shutdown")
.help("Trigger agent shutdown");
Namespace res = null;
try {
res = parser.parseArgs(args);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
.help(help);
}
public static void main(String[] args) throws Exception {
ArgumentParser rootParser = ArgumentParsers
.newArgumentParser("trogdor-agent-client")
.defaultHelp(true)
.description("The Trogdor agent client.");
Subparsers subParsers = rootParser.addSubparsers().
dest("command");
Subparser uptimeParser = subParsers.addParser("uptime")
.help("Get the agent uptime.");
addTargetArgument(uptimeParser);
addJsonArgument(uptimeParser);
Subparser statusParser = subParsers.addParser("status")
.help("Get the agent status.");
addTargetArgument(statusParser);
addJsonArgument(statusParser);
Subparser createWorkerParser = subParsers.addParser("createWorker")
.help("Create a new worker.");
addTargetArgument(createWorkerParser);
addWorkerIdArgument(createWorkerParser, "The worker ID to create.");
createWorkerParser.addArgument("--taskId")
.action(store())
.required(true)
.type(String.class)
.dest("taskId")
.metavar("TASK_ID")
.help("The task ID to create.");
createWorkerParser.addArgument("--spec", "-s")
.action(store())
.required(true)
.type(String.class)
.dest("taskSpec")
.metavar("TASK_SPEC")
.help("The task spec to create, or a path to a file containing the task spec.");
Subparser stopWorkerParser = subParsers.addParser("stopWorker")
.help("Stop a worker.");
addTargetArgument(stopWorkerParser);
addWorkerIdArgument(stopWorkerParser, "The worker ID to stop.");
Subparser destroyWorkerParser = subParsers.addParser("destroyWorker")
.help("Destroy a worker.");
addTargetArgument(destroyWorkerParser);
addWorkerIdArgument(destroyWorkerParser, "The worker ID to destroy.");
Subparser shutdownParser = subParsers.addParser("shutdown")
.help("Shut down the agent.");
addTargetArgument(shutdownParser);
Namespace res = rootParser.parseArgsOrFail(args);
String target = res.getString("target");
AgentClient client = new Builder().
maxTries(3).
target(target).
build();
if (res.getBoolean("status")) {
System.out.println("Got agent status: " +
JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getBoolean("uptime")) {
System.out.println("Got agent uptime: " +
JsonUtil.toPrettyJsonString(client.uptime()));
} else if (res.getString("create_worker") != null) {
CreateWorkerRequest req = JsonUtil.JSON_SERDE.
readValue(res.getString("create_worker"), CreateWorkerRequest.class);
ZoneOffset localOffset = OffsetDateTime.now().getOffset();
switch (res.getString("command")) {
case "uptime": {
UptimeResponse uptime = client.uptime();
if (res.getBoolean("json")) {
System.out.println(JsonUtil.toJsonString(uptime));
} else {
System.out.printf("Agent is running at %s.%n", target);
System.out.printf("\tStart time: %s%n",
dateString(uptime.serverStartMs(), localOffset));
System.out.printf("\tCurrent server time: %s%n",
dateString(uptime.nowMs(), localOffset));
System.out.printf("\tUptime: %s%n",
durationString(uptime.nowMs() - uptime.serverStartMs()));
}
break;
}
case "status": {
AgentStatusResponse status = client.status();
if (res.getBoolean("json")) {
System.out.println(JsonUtil.toJsonString(status));
} else {
System.out.printf("Agent is running at %s.%n", target);
System.out.printf("\tStart time: %s%n",
dateString(status.serverStartMs(), localOffset));
List<List<String>> lines = new ArrayList<>();
List<String> header = new ArrayList<>(
Arrays.asList("WORKER_ID", "TASK_ID", "STATE", "TASK_TYPE"));
lines.add(header);
for (Map.Entry<Long, WorkerState> entry : status.workers().entrySet()) {
List<String> cols = new ArrayList<>();
cols.add(Long.toString(entry.getKey()));
cols.add(entry.getValue().taskId());
cols.add(entry.getValue().getClass().getSimpleName());
cols.add(entry.getValue().spec().getClass().getCanonicalName());
lines.add(cols);
}
System.out.print(StringFormatter.prettyPrintGrid(lines));
}
break;
}
case "createWorker": {
long workerId = res.getLong("workerId");
String taskId = res.getString("taskId");
TaskSpec taskSpec = JsonUtil.
objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class);
CreateWorkerRequest req =
new CreateWorkerRequest(workerId, taskId, taskSpec);
client.createWorker(req);
System.out.printf("Sent CreateWorkerRequest for worker %d%n.", req.workerId());
} else if (res.getString("stop_worker") != null) {
long workerId = res.getLong("stop_worker");
break;
}
case "stopWorker": {
long workerId = res.getLong("workerId");
client.stopWorker(new StopWorkerRequest(workerId));
System.out.printf("Sent StopWorkerRequest for worker %d%n.", workerId);
} else if (res.getString("destroy_worker") != null) {
long workerId = res.getLong("stop_worker");
break;
}
case "destroyWorker": {
long workerId = res.getLong("workerId");
client.destroyWorker(new DestroyWorkerRequest(workerId));
System.out.printf("Sent DestroyWorkerRequest for worker %d%n.", workerId);
} else if (res.getBoolean("shutdown")) {
break;
}
case "shutdown": {
client.invokeShutdown();
System.out.println("Sent ShutdownRequest.");
} else {
break;
}
default: {
System.out.println("You must choose an action. Type --help for help.");
Exit.exit(1);
}
}
}
};

18
tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java

@ -230,6 +230,11 @@ public final class WorkerManager { @@ -230,6 +230,11 @@ public final class WorkerManager {
*/
private Future<Void> timeoutFuture = null;
/**
* A future which is completed when the task transitions to DONE state.
*/
private KafkaFutureImpl<String> doneFuture = null;
/**
* A shutdown manager reference which will keep the WorkerManager
* alive for as long as this worker is alive.
@ -300,6 +305,7 @@ public final class WorkerManager { @@ -300,6 +305,7 @@ public final class WorkerManager {
reference.close();
reference = null;
}
doneFuture.complete(error);
}
@Override
@ -308,20 +314,21 @@ public final class WorkerManager { @@ -308,20 +314,21 @@ public final class WorkerManager {
}
}
public void createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
public KafkaFuture<String> createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
final Worker worker = stateChangeExecutor.
submit(new CreateWorker(workerId, taskId, spec, time.milliseconds())).get();
if (worker == null) {
if (worker.doneFuture != null) {
log.info("{}: Ignoring request to create worker {}, because there is already " +
"a worker with that id.", nodeName, workerId);
return;
return worker.doneFuture;
}
worker.doneFuture = new KafkaFutureImpl<>();
if (worker.spec.endMs() <= time.milliseconds()) {
log.info("{}: Will not run worker {} as it has expired.", nodeName, worker);
stateChangeExecutor.submit(new HandleWorkerHalting(worker,
"worker expired", true));
return;
return worker.doneFuture;
}
KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
haltFuture.thenApply((KafkaFuture.BaseFunction<String, Void>) errorString -> {
@ -345,6 +352,7 @@ public final class WorkerManager { @@ -345,6 +352,7 @@ public final class WorkerManager {
"worker.start() exception: " + Utils.stackTrace(e), true));
}
stateChangeExecutor.submit(new FinishCreatingWorker(worker));
return worker.doneFuture;
} catch (ExecutionException e) {
if (e.getCause() instanceof RequestConflictException) {
log.info("{}: request conflict while creating worker {} for task {} with spec {}.",
@ -385,7 +393,7 @@ public final class WorkerManager { @@ -385,7 +393,7 @@ public final class WorkerManager {
throw new RequestConflictException("There is already a worker ID " + workerId +
" with a different task spec.");
} else {
return null;
return worker;
}
}
worker = new Worker(workerId, taskId, spec, now);

41
tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java

@ -18,12 +18,15 @@ @@ -18,12 +18,15 @@
package org.apache.kafka.trogdor.common;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import java.io.File;
/**
* Utilities for working with JSON.
*/
@ -34,6 +37,7 @@ public class JsonUtil { @@ -34,6 +37,7 @@ public class JsonUtil {
JSON_SERDE = new ObjectMapper();
JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
JSON_SERDE.registerModule(new Jdk8Module());
JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}
@ -53,4 +57,41 @@ public class JsonUtil { @@ -53,4 +57,41 @@ public class JsonUtil {
throw new RuntimeException(e);
}
}
/**
* Determine if a string is a JSON object literal.
* Object literals must begin with an open brace.
*
* @param input The input string.
* @return True if the string is a JSON literal.
*/
static boolean openBraceComesFirst(String input) {
for (int i = 0; i < input.length(); i++) {
char c = input.charAt(i);
if (!Character.isWhitespace(c)) {
return c == '{';
}
}
return false;
}
/**
* Read a JSON object from a command-line argument. This can take the form of a path to
* a file containing the JSON object, or simply the raw JSON object itself. We will assume
* that if the string is a valid JSON object, the latter is true. If you want to specify a
* file name containing an open brace, you can force it to be interpreted as a file name be
* prefixing a ./ or full path.
*
* @param argument The command-line argument.
* @param clazz The class of the object to be read.
* @param <T> The object type.
* @return The object which we read.
*/
public static <T> T objectFromCommandLineArgument(String argument, Class<T> clazz) throws Exception {
if (openBraceComesFirst(argument)) {
return JSON_SERDE.readValue(argument, clazz);
} else {
return JSON_SERDE.readValue(new File(argument), clazz);
}
}
}

114
tools/src/main/java/org/apache/kafka/trogdor/common/StringFormatter.java

@ -0,0 +1,114 @@ @@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* Utilities for formatting strings.
*/
public class StringFormatter {
/**
* Pretty-print a date string.
*
* @param timeMs The time since the epoch in milliseconds.
* @param zoneOffset The time zone offset.
* @return The date string in ISO format.
*/
public static String dateString(long timeMs, ZoneOffset zoneOffset) {
return new Date(timeMs).toInstant().
atOffset(zoneOffset).
format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
}
/**
* Pretty-print a duration.
*
* @param periodMs The duration in milliseconds.
* @return A human-readable duration string.
*/
public static String durationString(long periodMs) {
StringBuilder bld = new StringBuilder();
Duration duration = Duration.ofMillis(periodMs);
long hours = duration.toHours();
if (hours > 0) {
bld.append(hours).append("h");
duration = duration.minusHours(hours);
}
long minutes = duration.toMinutes();
if (minutes > 0) {
bld.append(minutes).append("m");
duration = duration.minusMinutes(minutes);
}
long seconds = duration.getSeconds();
if ((seconds != 0) || bld.toString().isEmpty()) {
bld.append(seconds).append("s");
}
return bld.toString();
}
/**
* Formats strings in a grid pattern.
*
* All entries in the same column will have the same width.
*
* @param lines A list of lines. Each line contains a list of columns.
* Each line must contain the same number of columns.
* @return The string.
*/
public static String prettyPrintGrid(List<List<String>> lines) {
int numColumns = -1;
int rowIndex = 0;
for (List<String> col : lines) {
if (numColumns == -1) {
numColumns = col.size();
} else if (numColumns != col.size()) {
throw new RuntimeException("Expected " + numColumns + " columns in row " +
rowIndex + ", but got " + col.size());
}
rowIndex++;
}
List<Integer> widths = new ArrayList<>(numColumns);
for (int x = 0; x < numColumns; x++) {
int w = 0;
for (List<String> cols : lines) {
w = Math.max(w, cols.get(x).length() + 1);
}
widths.add(w);
}
StringBuilder bld = new StringBuilder();
for (int y = 0; y < lines.size(); y++) {
List<String> cols = lines.get(y);
for (int x = 0; x < cols.size(); x++) {
String val = cols.get(x);
int minWidth = widths.get(x);
bld.append(val);
for (int i = 0; i < minWidth - val.length(); i++) {
bld.append(" ");
}
}
bld.append(String.format("%n"));
}
return bld.toString();
}
}

371
tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java

@ -20,11 +20,13 @@ package org.apache.kafka.trogdor.coordinator; @@ -20,11 +20,13 @@ package org.apache.kafka.trogdor.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.StringFormatter;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
@ -32,10 +34,16 @@ import org.apache.kafka.trogdor.rest.Empty; @@ -32,10 +34,16 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,10 +51,22 @@ import org.slf4j.LoggerFactory; @@ -43,10 +51,22 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.UriBuilder;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import static net.sourceforge.argparse4j.impl.Arguments.append;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
/**
* A client for the Trogdor coordinator.
@ -158,6 +178,9 @@ public class CoordinatorClient { @@ -158,6 +178,9 @@ public class CoordinatorClient {
uriBuilder.queryParam("lastStartMs", request.lastStartMs());
uriBuilder.queryParam("firstEndMs", request.firstEndMs());
uriBuilder.queryParam("lastEndMs", request.lastEndMs());
if (request.state().isPresent()) {
uriBuilder.queryParam("state", request.state().get().toString());
}
HttpResponse<TasksResponse> resp =
JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "GET",
null, new TypeReference<TasksResponse>() { }, maxTries);
@ -178,119 +201,309 @@ public class CoordinatorClient { @@ -178,119 +201,309 @@ public class CoordinatorClient {
resp.body();
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-coordinator-client")
.defaultHelp(true)
.description("The Trogdor fault injection coordinator client.");
parser.addArgument("target")
private static void addTargetArgument(ArgumentParser parser) {
parser.addArgument("--target", "-t")
.action(store())
.required(true)
.type(String.class)
.dest("target")
.metavar("TARGET")
.help("A colon-separated host and port pair. For example, example.com:8889");
MutuallyExclusiveGroup actions = parser.addMutuallyExclusiveGroup();
actions.addArgument("--status")
.action(storeTrue())
.type(Boolean.class)
.dest("status")
.help("Get coordinator status.");
actions.addArgument("--uptime")
.action(storeTrue())
.type(Boolean.class)
.dest("uptime")
.help("Get coordinator uptime.");
actions.addArgument("--show-tasks")
}
private static void addJsonArgument(ArgumentParser parser) {
parser.addArgument("--json")
.action(storeTrue())
.type(Boolean.class)
.dest("show_tasks")
.help("Show coordinator tasks.");
actions.addArgument("--show-task")
.dest("json")
.metavar("JSON")
.help("Show the full response as JSON.");
}
public static void main(String[] args) throws Exception {
ArgumentParser rootParser = ArgumentParsers
.newArgumentParser("trogdor-coordinator-client")
.description("The Trogdor coordinator client.");
Subparsers subParsers = rootParser.addSubparsers().
dest("command");
Subparser uptimeParser = subParsers.addParser("uptime")
.help("Get the coordinator uptime.");
addTargetArgument(uptimeParser);
addJsonArgument(uptimeParser);
Subparser statusParser = subParsers.addParser("status")
.help("Get the coordinator status.");
addTargetArgument(statusParser);
addJsonArgument(statusParser);
Subparser showTaskParser = subParsers.addParser("showTask")
.help("Show a coordinator task.");
addTargetArgument(showTaskParser);
addJsonArgument(showTaskParser);
showTaskParser.addArgument("--id", "-i")
.action(store())
.required(true)
.type(String.class)
.dest("show_task")
.dest("taskId")
.metavar("TASK_ID")
.help("Show a specific coordinator task.");
actions.addArgument("--create-task")
.help("The task ID to show.");
showTaskParser.addArgument("--verbose", "-v")
.action(storeTrue())
.dest("verbose")
.metavar("VERBOSE")
.help("Print out everything.");
showTaskParser.addArgument("--show-status", "-S")
.action(storeTrue())
.dest("showStatus")
.metavar("SHOW_STATUS")
.help("Show the task status.");
Subparser showTasksParser = subParsers.addParser("showTasks")
.help("Show many coordinator tasks. By default, all tasks are shown, but " +
"command-line options can be specified as filters.");
addTargetArgument(showTasksParser);
addJsonArgument(showTasksParser);
MutuallyExclusiveGroup idGroup = showTasksParser.addMutuallyExclusiveGroup();
idGroup.addArgument("--id", "-i")
.action(append())
.type(String.class)
.dest("taskIds")
.metavar("TASK_IDS")
.help("Show only this task ID. This option may be specified multiple times.");
idGroup.addArgument("--id-pattern")
.action(store())
.type(String.class)
.dest("create_task")
.metavar("TASK_SPEC_JSON")
.help("Create a new task from a task spec.");
actions.addArgument("--stop-task")
.dest("taskIdPattern")
.metavar("TASK_ID_PATTERN")
.help("Only display tasks which match the given ID pattern.");
showTasksParser.addArgument("--state", "-s")
.type(TaskStateType.class)
.dest("taskStateType")
.metavar("TASK_STATE_TYPE")
.help("Show only tasks in this state.");
Subparser createTaskParser = subParsers.addParser("createTask")
.help("Create a new task.");
addTargetArgument(createTaskParser);
createTaskParser.addArgument("--id", "-i")
.action(store())
.required(true)
.type(String.class)
.dest("stop_task")
.dest("taskId")
.metavar("TASK_ID")
.help("The task ID to create.");
createTaskParser.addArgument("--spec", "-s")
.action(store())
.required(true)
.type(String.class)
.dest("taskSpec")
.metavar("TASK_SPEC")
.help("The task spec to create, or a path to a file containing the task spec.");
Subparser stopTaskParser = subParsers.addParser("stopTask")
.help("Stop a task.");
actions.addArgument("--destroy-task")
addTargetArgument(stopTaskParser);
stopTaskParser.addArgument("--id", "-i")
.action(store())
.required(true)
.type(String.class)
.dest("destroy_task")
.dest("taskId")
.metavar("TASK_ID")
.help("The task ID to create.");
Subparser destroyTaskParser = subParsers.addParser("destroyTask")
.help("Destroy a task.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
.dest("shutdown")
.help("Trigger coordinator shutdown");
addTargetArgument(destroyTaskParser);
destroyTaskParser.addArgument("--id", "-i")
.action(store())
.required(true)
.type(String.class)
.dest("taskId")
.metavar("TASK_ID")
.help("The task ID to destroy.");
Subparser shutdownParser = subParsers.addParser("shutdown")
.help("Shut down the coordinator.");
addTargetArgument(shutdownParser);
Namespace res = null;
try {
res = parser.parseArgs(args);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
Namespace res = rootParser.parseArgsOrFail(args);
String target = res.getString("target");
CoordinatorClient client = new Builder().
maxTries(3).
target(target).
build();
if (res.getBoolean("status")) {
System.out.println("Got coordinator status: " +
JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getBoolean("uptime")) {
System.out.println("Got coordinator uptime: " +
JsonUtil.toPrettyJsonString(client.uptime()));
} else if (res.getBoolean("show_tasks")) {
System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks(
new TasksRequest(null, 0, 0, 0, 0, Optional.empty()))));
} else if (res.getString("show_task") != null) {
String taskId = res.getString("show_task");
TaskRequest req = new TaskRequest(res.getString("show_task"));
ZoneOffset localOffset = OffsetDateTime.now().getOffset();
switch (res.getString("command")) {
case "uptime": {
UptimeResponse uptime = client.uptime();
if (res.getBoolean("json")) {
System.out.println(JsonUtil.toJsonString(uptime));
} else {
System.out.printf("Coordinator is running at %s.%n", target);
System.out.printf("\tStart time: %s%n",
dateString(uptime.serverStartMs(), localOffset));
System.out.printf("\tCurrent server time: %s%n",
dateString(uptime.nowMs(), localOffset));
System.out.printf("\tUptime: %s%n",
durationString(uptime.nowMs() - uptime.serverStartMs()));
}
break;
}
case "status": {
CoordinatorStatusResponse response = client.status();
if (res.getBoolean("json")) {
System.out.println(JsonUtil.toJsonString(response));
} else {
System.out.printf("Coordinator is running at %s.%n", target);
System.out.printf("\tStart time: %s%n", dateString(response.serverStartMs(), localOffset));
}
break;
}
case "showTask": {
String taskId = res.getString("taskId");
TaskRequest req = new TaskRequest(taskId);
TaskState taskState = null;
try {
String taskOutput = String.format("Got coordinator task \"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req)));
System.out.println(taskOutput);
taskState = client.task(req);
} catch (NotFoundException e) {
System.out.println(e.getMessage());
System.out.printf("Task %s was not found.%n", taskId);
Exit.exit(1);
}
} else if (res.getString("create_task") != null) {
CreateTaskRequest req = JsonUtil.JSON_SERDE.
readValue(res.getString("create_task"), CreateTaskRequest.class);
if (res.getBoolean("json")) {
System.out.println(JsonUtil.toJsonString(taskState));
} else {
System.out.printf("Task %s of type %s is %s. %s%n", taskId,
taskState.spec().getClass().getCanonicalName(),
taskState.stateType(), prettyPrintTaskInfo(taskState, localOffset));
if (taskState instanceof TaskDone) {
TaskDone taskDone = (TaskDone) taskState;
if ((taskDone.error() != null) && (!taskDone.error().isEmpty())) {
System.out.printf("Error: %s%n", taskDone.error());
}
}
if (res.getBoolean("verbose")) {
System.out.printf("Spec: %s%n%n", JsonUtil.toPrettyJsonString(taskState.spec()));
}
if (res.getBoolean("verbose") || res.getBoolean("showStatus")) {
System.out.printf("Status: %s%n%n", JsonUtil.toPrettyJsonString(taskState.status()));
}
}
break;
}
case "showTasks": {
TaskStateType taskStateType = res.<TaskStateType>get("taskStateType");
List<String> taskIds = new ArrayList<>();
Pattern taskIdPattern = null;
if (res.getList("taskIds") != null) {
for (Object taskId : res.getList("taskIds")) {
taskIds.add((String) taskId);
}
} else if (res.getString("taskIdPattern") != null) {
try {
taskIdPattern = Pattern.compile(res.getString("taskIdPattern"));
} catch (PatternSyntaxException e) {
System.out.println("Invalid task ID regular expression " + res.getString("taskIdPattern"));
e.printStackTrace();
Exit.exit(1);
}
}
TasksRequest req = new TasksRequest(taskIds, 0, 0, 0, 0,
Optional.ofNullable(taskStateType));
TasksResponse response = client.tasks(req);
if (taskIdPattern != null) {
TreeMap<String, TaskState> filteredTasks = new TreeMap<>();
for (Map.Entry<String, TaskState> entry : response.tasks().entrySet()) {
if (taskIdPattern.matcher(entry.getKey()).matches()) {
filteredTasks.put(entry.getKey(), entry.getValue());
}
}
response = new TasksResponse(filteredTasks);
}
if (res.getBoolean("json")) {
System.out.println(JsonUtil.toJsonString(response));
} else {
System.out.println(prettyPrintTasksResponse(response, localOffset));
}
if (response.tasks().isEmpty()) {
Exit.exit(1);
}
break;
}
case "createTask": {
String taskId = res.getString("taskId");
TaskSpec taskSpec = JsonUtil.
objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class);
CreateTaskRequest req = new CreateTaskRequest(taskId, taskSpec);
client.createTask(req);
System.out.printf("Sent CreateTaskRequest for task %s.", req.id());
} else if (res.getString("stop_task") != null) {
String taskId = res.getString("stop_task");
client.stopTask(new StopTaskRequest(taskId));
System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id());
break;
}
case "stopTask": {
String taskId = res.getString("taskId");
StopTaskRequest req = new StopTaskRequest(taskId);
client.stopTask(req);
System.out.printf("Sent StopTaskRequest for task %s.%n", taskId);
} else if (res.getString("destroy_task") != null) {
String taskId = res.getString("destroy_task");
client.destroyTask(new DestroyTaskRequest(taskId));
break;
}
case "destroyTask": {
String taskId = res.getString("taskId");
DestroyTaskRequest req = new DestroyTaskRequest(taskId);
client.destroyTask(req);
System.out.printf("Sent DestroyTaskRequest for task %s.%n", taskId);
} else if (res.getBoolean("shutdown")) {
break;
}
case "shutdown": {
client.shutdown();
System.out.println("Sent ShutdownRequest.");
} else {
break;
}
default: {
System.out.println("You must choose an action. Type --help for help.");
Exit.exit(1);
}
}
};
}
static String prettyPrintTasksResponse(TasksResponse response, ZoneOffset zoneOffset) {
if (response.tasks().isEmpty()) {
return "No matching tasks found.";
}
List<List<String>> lines = new ArrayList<>();
List<String> header = new ArrayList<>(
Arrays.asList("ID", "TYPE", "STATE", "INFO"));
lines.add(header);
for (Map.Entry<String, TaskState> entry : response.tasks().entrySet()) {
String taskId = entry.getKey();
TaskState taskState = entry.getValue();
List<String> cols = new ArrayList<>();
cols.add(taskId);
cols.add(taskState.spec().getClass().getCanonicalName());
cols.add(taskState.stateType().toString());
cols.add(prettyPrintTaskInfo(taskState, zoneOffset));
lines.add(cols);
}
return StringFormatter.prettyPrintGrid(lines);
}
static String prettyPrintTaskInfo(TaskState taskState, ZoneOffset zoneOffset) {
if (taskState instanceof TaskPending) {
return "Will start at " + dateString(taskState.spec().startMs(), zoneOffset);
} else if (taskState instanceof TaskRunning) {
TaskRunning runState = (TaskRunning) taskState;
return "Started " + dateString(runState.startedMs(), zoneOffset) +
"; will stop after " + durationString(taskState.spec().durationMs());
} else if (taskState instanceof TaskStopping) {
TaskStopping stoppingState = (TaskStopping) taskState;
return "Started " + dateString(stoppingState.startedMs(), zoneOffset);
} else if (taskState instanceof TaskDone) {
TaskDone doneState = (TaskDone) taskState;
String status = null;
if (doneState.error() == null || doneState.error().isEmpty()) {
if (doneState.cancelled()) {
status = "CANCELLED";
} else {
status = "FINISHED";
}
} else {
status = "FAILED";
}
return String.format("%s at %s after %s", status,
dateString(doneState.doneMs(), zoneOffset),
durationString(doneState.doneMs() - doneState.startedMs()));
} else {
throw new RuntimeException("Unknown task state type " + taskState.stateType());
}
}
}

5
tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java

@ -78,4 +78,9 @@ public class TaskDone extends TaskState { @@ -78,4 +78,9 @@ public class TaskDone extends TaskState {
public boolean cancelled() {
return cancelled;
}
@Override
public TaskStateType stateType() {
return TaskStateType.DONE;
}
}

5
tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java

@ -30,4 +30,9 @@ public class TaskPending extends TaskState { @@ -30,4 +30,9 @@ public class TaskPending extends TaskState {
public TaskPending(@JsonProperty("spec") TaskSpec spec) {
super(spec, NullNode.instance);
}
@Override
public TaskStateType stateType() {
return TaskStateType.PENDING;
}
}

5
tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java

@ -43,4 +43,9 @@ public class TaskRunning extends TaskState { @@ -43,4 +43,9 @@ public class TaskRunning extends TaskState {
public long startedMs() {
return startedMs;
}
@Override
public TaskStateType stateType() {
return TaskStateType.RUNNING;
}
}

2
tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java

@ -55,4 +55,6 @@ public abstract class TaskState extends Message { @@ -55,4 +55,6 @@ public abstract class TaskState extends Message {
public JsonNode status() {
return status;
}
public abstract TaskStateType stateType();
}

5
tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java

@ -43,4 +43,9 @@ public class TaskStopping extends TaskState { @@ -43,4 +43,9 @@ public class TaskStopping extends TaskState {
public long startedMs() {
return startedMs;
}
@Override
public TaskStateType stateType() {
return TaskStateType.STOPPING;
}
}

45
tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java

@ -28,7 +28,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform; @@ -28,7 +28,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.fault.Kibosh;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
@ -45,13 +47,17 @@ import org.apache.kafka.trogdor.rest.WorkerDone; @@ -45,13 +47,17 @@ import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@ -69,6 +75,7 @@ public class AgentTest { @@ -69,6 +75,7 @@ public class AgentTest {
private static BasicPlatform createBasicPlatform(Scheduler scheduler) {
TreeMap<String, Node> nodes = new TreeMap<>();
HashMap<String, String> config = new HashMap<>();
config.put(Platform.Config.TROGDOR_AGENT_PORT, Integer.toString(Agent.DEFAULT_PORT));
nodes.put("node01", new BasicNode("node01", "localhost",
config, Collections.<String>emptySet()));
BasicTopology topology = new BasicTopology(nodes);
@ -447,4 +454,42 @@ public class AgentTest { @@ -447,4 +454,42 @@ public class AgentTest {
agent.beginShutdown();
agent.waitForShutdown();
}
static void testExec(Agent agent, String expected, boolean expectedReturn, TaskSpec spec) throws Exception {
ByteArrayOutputStream b = new ByteArrayOutputStream();
PrintStream p = new PrintStream(b, true, StandardCharsets.UTF_8.toString());
boolean actualReturn = agent.exec(spec, p);
assertEquals(expected, b.toString());
assertEquals(expectedReturn, actualReturn);
}
@Test
public void testAgentExecWithTimeout() throws Exception {
Agent agent = createAgent(Scheduler.SYSTEM);
NoOpTaskSpec spec = new NoOpTaskSpec(0, 1);
TaskSpec rebasedSpec = agent.rebaseTaskSpecTime(spec);
testExec(agent,
String.format("Waiting for completion of task:%s%n",
JsonUtil.toPrettyJsonString(rebasedSpec)) +
String.format("Task failed with status null and error worker expired%n"),
false, rebasedSpec);
agent.beginShutdown();
agent.waitForShutdown();
}
@Test
public void testAgentExecWithNormalExit() throws Exception {
Agent agent = createAgent(Scheduler.SYSTEM);
SampleTaskSpec spec = new SampleTaskSpec(0, 120000,
Collections.singletonMap("node01", 1L), "");
TaskSpec rebasedSpec = agent.rebaseTaskSpecTime(spec);
testExec(agent,
String.format("Waiting for completion of task:%s%n",
JsonUtil.toPrettyJsonString(rebasedSpec)) +
String.format("Task succeeded with status \"halted\"%n"),
true, rebasedSpec);
agent.beginShutdown();
agent.waitForShutdown();
}
};

79
tools/src/test/java/org/apache/kafka/trogdor/common/JsonUtilTest.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.test.TestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class JsonUtilTest {
private static final Logger log = LoggerFactory.getLogger(JsonUtilTest.class);
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testOpenBraceComesFirst() {
assertTrue(JsonUtil.openBraceComesFirst("{}"));
assertTrue(JsonUtil.openBraceComesFirst(" \t{\"foo\":\"bar\"}"));
assertTrue(JsonUtil.openBraceComesFirst(" { \"foo\": \"bar\" }"));
assertFalse(JsonUtil.openBraceComesFirst("/my/file/path"));
assertFalse(JsonUtil.openBraceComesFirst("mypath"));
assertFalse(JsonUtil.openBraceComesFirst(" blah{}"));
}
static final class Foo {
@JsonProperty
final int bar;
@JsonCreator
Foo(@JsonProperty("bar") int bar) {
this.bar = bar;
}
}
@Test
public void testObjectFromCommandLineArgument() throws Exception {
assertEquals(123, JsonUtil.<Foo>
objectFromCommandLineArgument("{\"bar\":123}", Foo.class).bar);
assertEquals(1, JsonUtil.<Foo>
objectFromCommandLineArgument(" {\"bar\": 1} ", Foo.class).bar);
File tempFile = TestUtils.tempFile();
try {
Files.write(tempFile.toPath(),
"{\"bar\": 456}".getBytes(StandardCharsets.UTF_8));
assertEquals(456, JsonUtil.<Foo>
objectFromCommandLineArgument(tempFile.getAbsolutePath(), Foo.class).bar);
} finally {
Files.delete(tempFile.toPath());
}
}
};

68
tools/src/test/java/org/apache/kafka/trogdor/common/StringFormatterTest.java

@ -0,0 +1,68 @@ @@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import java.time.ZoneOffset;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
public class StringFormatterTest {
private static final Logger log = LoggerFactory.getLogger(StringFormatterTest.class);
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testDateString() {
assertEquals("2019-01-08T20:59:29.85Z",
dateString(1546981169850L, ZoneOffset.UTC));
}
@Test
public void testDurationString() {
assertEquals("1m", durationString(60000));
assertEquals("1m1s", durationString(61000));
assertEquals("1m1s", durationString(61200));
assertEquals("5s", durationString(5000));
assertEquals("2h", durationString(7200000));
assertEquals("2h1s", durationString(7201000));
assertEquals("2h5m3s", durationString(7503000));
}
@Test
public void testPrettyPrintGrid() {
assertEquals(String.format(
"ANIMAL NUMBER INDEX %n" +
"lion 1 12345 %n" +
"manatee 50 1 %n"),
StringFormatter.prettyPrintGrid(
Arrays.asList(Arrays.asList("ANIMAL", "NUMBER", "INDEX"),
Arrays.asList("lion", "1", "12345"),
Arrays.asList("manatee", "50", "1"))));
}
};

83
tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorClientTest.java

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.coordinator;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import java.time.ZoneOffset;
import static org.junit.Assert.assertEquals;
public class CoordinatorClientTest {
private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testPrettyPrintTaskInfo() {
assertEquals("Will start at 2019-01-08T07:05:59.85Z",
CoordinatorClient.prettyPrintTaskInfo(
new TaskPending(new NoOpTaskSpec(1546931159850L, 9000)),
ZoneOffset.UTC));
assertEquals("Started 2009-07-07T01:45:59.85Z; will stop after 9s",
CoordinatorClient.prettyPrintTaskInfo(
new TaskRunning(new NoOpTaskSpec(1146931159850L, 9000),
1246931159850L,
JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
assertEquals("Started 2009-07-07T01:45:59.85Z",
CoordinatorClient.prettyPrintTaskInfo(
new TaskStopping(new NoOpTaskSpec(1146931159850L, 9000),
1246931159850L,
JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
assertEquals("FINISHED at 2019-01-08T20:59:29.85Z after 10s",
CoordinatorClient.prettyPrintTaskInfo(
new TaskDone(new NoOpTaskSpec(0, 1000),
1546981159850L,
1546981169850L,
"",
false,
JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
assertEquals("CANCELLED at 2019-01-08T20:59:29.85Z after 10s",
CoordinatorClient.prettyPrintTaskInfo(
new TaskDone(new NoOpTaskSpec(0, 1000),
1546981159850L,
1546981169850L,
"",
true,
JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
assertEquals("FAILED at 2019-01-08T20:59:29.85Z after 10s",
CoordinatorClient.prettyPrintTaskInfo(
new TaskDone(new NoOpTaskSpec(0, 1000),
1546981159850L,
1546981169850L,
"foobar",
true,
JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
}
};
Loading…
Cancel
Save