Browse Source

KAFKA-14594: Move LogDirsCommand to tools module (#13122)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
pull/13673/head
Christo Lolov 1 year ago committed by GitHub
parent
commit
dc7819d7f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-log-dirs.sh
  2. 2
      bin/windows/kafka-log-dirs.bat
  3. 32
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  4. 132
      core/src/main/scala/kafka/admin/LogDirsCommand.scala
  5. 76
      core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
  6. 220
      tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java
  7. 114
      tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java

2
bin/kafka-log-dirs.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LogDirsCommand "$@"

2
bin/windows/kafka-log-dirs.bat

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.LogDirsCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LogDirsCommand %*

32
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -945,7 +945,37 @@ public class MockAdminClient extends AdminClient { @@ -945,7 +945,37 @@ public class MockAdminClient extends AdminClient {
@Override
synchronized public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers,
DescribeLogDirsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Map<Integer, Map<String, LogDirDescription>> unwrappedResults = new HashMap<>();
for (Integer broker : brokers) {
unwrappedResults.putIfAbsent(broker, new HashMap<>());
}
for (Map.Entry<String, TopicMetadata> entry : allTopics.entrySet()) {
String topicName = entry.getKey();
TopicMetadata topicMetadata = entry.getValue();
// For tests, we make the assumption that there will always be only 1 entry.
List<String> partitionLogDirs = topicMetadata.partitionLogDirs;
List<TopicPartitionInfo> topicPartitionInfos = topicMetadata.partitions;
for (TopicPartitionInfo topicPartitionInfo : topicPartitionInfos) {
List<Node> nodes = topicPartitionInfo.replicas();
for (Node node : nodes) {
Map<String, LogDirDescription> logDirDescriptionMap = unwrappedResults.get(node.id());
LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>()));
logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false));
}
}
}
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> results = new HashMap<>();
for (Map.Entry<Integer, Map<String, LogDirDescription>> entry : unwrappedResults.entrySet()) {
KafkaFutureImpl<Map<String, LogDirDescription>> kafkaFuture = new KafkaFutureImpl<>();
kafkaFuture.complete(entry.getValue());
results.put(entry.getKey(), kafkaFuture);
}
return new DescribeLogDirsResult(results);
}
@Override

132
core/src/main/scala/kafka/admin/LogDirsCommand.scala

@ -1,132 +0,0 @@ @@ -1,132 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.utils.Json
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Map
/**
* A command for querying log directory usage on the specified brokers
*/
object LogDirsCommand {
def main(args: Array[String]): Unit = {
describe(args, System.out)
}
def describe(args: Array[String], out: PrintStream): Unit = {
val opts = new LogDirsCommandOptions(args)
val adminClient = createAdminClient(opts)
try {
val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
case Some(brokerListStr) =>
val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
(inputBrokers.intersect(clusterBrokers), inputBrokers.diff(clusterBrokers))
case None => (clusterBrokers, Set.empty)
}
if (nonExistingBrokers.nonEmpty) {
out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}." +
s" Current existent brokers: ${clusterBrokers.mkString(",")}")
} else {
out.println("Querying brokers for log directories information")
val describeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
out.println(s"Received log directory information from brokers ${existingBrokers.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
}
} finally {
adminClient.close()
}
}
private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = {
Json.encodeAsString(Map(
"version" -> 1,
"brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
Map(
"broker" -> broker,
"logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
Map(
"logDir" -> logDir,
"error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull,
"partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
topicSet.isEmpty || topicSet.contains(topicPartition.topic)
}.map { case (topicPartition, replicaInfo) =>
Map(
"partition" -> topicPartition.toString,
"size" -> replicaInfo.size,
"offsetLag" -> replicaInfo.offsetLag,
"isFuture" -> replicaInfo.isFuture
).asJava
}.asJava
).asJava
}.asJava
).asJava
}.asJava
).asJava)
}
private def createAdminClient(opts: LogDirsCommandOptions): Admin = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
Admin.create(props)
}
class LogDirsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args){
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
.withRequiredArg
.describedAs("The server(s) to use for bootstrapping")
.ofType(classOf[String])
val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
.withRequiredArg
.describedAs("Admin client property file")
.ofType(classOf[String])
val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.")
val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
"All topics will be queried if no topic list is specified")
.withRequiredArg
.describedAs("Topic list")
.defaultsTo("")
.ofType(classOf[String])
val brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
"All brokers in the cluster will be queried if no broker list is specified")
.withRequiredArg
.describedAs("Broker list")
.ofType(classOf[String])
options = parser.parse(args : _*)
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.")
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
}
}

76
core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala

@ -1,76 +0,0 @@ @@ -1,76 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import scala.collection.Seq
class LogDirsCommandTest extends KafkaServerTestHarness {
def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(1, zkConnect)
.map(KafkaConfig.fromProps)
}
@Test
def checkLogDirsCommandOutput(): Unit = {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
//input exist brokerList
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0", "--describe"), printStream)
val existingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val existingBrokersLineIter = existingBrokersContent.split("\n").iterator
assertTrue(existingBrokersLineIter.hasNext)
assertTrue(existingBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
//input nonexistent brokerList
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,1,2", "--describe"), printStream)
val nonExistingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val nonExistingBrokersLineIter = nonExistingBrokersContent.split("\n").iterator
assertTrue(nonExistingBrokersLineIter.hasNext)
assertTrue(nonExistingBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))
//input duplicate ids
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,0,1,2,2", "--describe"), printStream)
val duplicateBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val duplicateBrokersLineIter = duplicateBrokersContent.split("\n").iterator
assertTrue(duplicateBrokersLineIter.hasNext)
assertTrue(duplicateBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))
//use all brokerList for current cluster
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--describe"), printStream)
val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val allBrokersLineIter = allBrokersContent.split("\n").iterator
assertTrue(allBrokersLineIter.hasNext)
assertTrue(allBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
}
}

220
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java

@ -0,0 +1,220 @@ @@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class LogDirsCommand {
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (TerseException e) {
System.err.println(e.getMessage());
return 1;
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
private static void execute(String... args) throws Exception {
LogDirsCommandOptions options = new LogDirsCommandOptions(args);
try (Admin adminClient = createAdminClient(options)) {
execute(options, adminClient);
}
}
static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception {
Set<String> topics = options.topics();
Set<Integer> clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
Set<Integer> inputBrokers = options.brokers();
Set<Integer> existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers);
existingBrokers.retainAll(clusterBrokers);
Set<Integer> nonExistingBrokers = new HashSet<>(inputBrokers);
nonExistingBrokers.removeAll(clusterBrokers);
if (!nonExistingBrokers.isEmpty()) {
throw new TerseException(
String.format(
"ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s",
commaDelimitedStringFromIntegerSet(nonExistingBrokers),
commaDelimitedStringFromIntegerSet(clusterBrokers)));
} else {
System.out.println("Querying brokers for log directories information");
DescribeLogDirsResult describeLogDirsResult = adminClient.describeLogDirs(existingBrokers);
Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker = describeLogDirsResult.allDescriptions().get();
System.out.printf(
"Received log directory information from brokers %s%n",
commaDelimitedStringFromIntegerSet(existingBrokers));
System.out.println(formatAsJson(logDirInfosByBroker, topics));
}
}
private static String commaDelimitedStringFromIntegerSet(Set<Integer> set) {
return set.stream().map(String::valueOf).collect(Collectors.joining(","));
}
private static List<Map<String, Object>> fromReplicasInfoToPrintableRepresentation(Map<TopicPartition, ReplicaInfo> replicasInfo) {
return replicasInfo.entrySet().stream().map(entry -> {
TopicPartition topicPartition = entry.getKey();
return new HashMap<String, Object>() {{
put("partition", topicPartition.toString());
put("size", entry.getValue().size());
put("offsetLag", entry.getValue().offsetLag());
put("isFuture", entry.getValue().isFuture());
}};
}).collect(Collectors.toList());
}
private static List<Map<String, Object>> fromLogDirInfosToPrintableRepresentation(Map<String, LogDirDescription> logDirInfos, Set<String> topicSet) {
return logDirInfos.entrySet().stream().map(entry -> {
String logDir = entry.getKey();
return new HashMap<String, Object>() {{
put("logDir", logDir);
put("error", entry.getValue().error() != null ? entry.getValue().error().getClass().getName() : null);
put("partitions", fromReplicasInfoToPrintableRepresentation(
entry.getValue().replicaInfos().entrySet().stream().filter(entry -> {
TopicPartition topicPartition = entry.getKey();
return topicSet.isEmpty() || topicSet.contains(topicPartition.topic());
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue))
));
}};
}).collect(Collectors.toList());
}
private static String formatAsJson(Map<Integer, Map<String, LogDirDescription>> logDirInfosByBroker, Set<String> topicSet) throws JsonProcessingException {
return new ObjectMapper().writeValueAsString(new HashMap<String, Object>() {{
put("version", 1);
put("brokers", logDirInfosByBroker.entrySet().stream().map(entry -> {
int broker = entry.getKey();
Map<String, LogDirDescription> logDirInfos = entry.getValue();
return new HashMap<String, Object>() {{
put("broker", broker);
put("logDirs", fromLogDirInfosToPrintableRepresentation(logDirInfos, topicSet));
}};
}).collect(Collectors.toList()));
}});
}
private static Admin createAdminClient(LogDirsCommandOptions options) throws IOException {
Properties props = new Properties();
if (options.hasCommandConfig()) {
props.putAll(Utils.loadProps(options.commandConfig()));
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, options.bootstrapServers());
props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool");
return Admin.create(props);
}
// Visible for testing
static class LogDirsCommandOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpecBuilder describeOpt;
private final OptionSpec<String> topicListOpt;
private final OptionSpec<String> brokerListOpt;
public LogDirsCommandOptions(String... args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
.withRequiredArg()
.describedAs("The server(s) to use for bootstrapping")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
.withRequiredArg()
.describedAs("Admin client property file")
.ofType(String.class);
describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.");
topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
"All topics will be queried if no topic list is specified")
.withRequiredArg()
.describedAs("Topic list")
.defaultsTo("")
.ofType(String.class);
brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
"All brokers in the cluster will be queried if no broker list is specified")
.withRequiredArg()
.describedAs("Broker list")
.ofType(String.class)
.defaultsTo("");
options = parser.parse(args);
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.");
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt);
}
private Stream<String> splitAtCommasAndFilterOutEmpty(OptionSpec<String> option) {
return Arrays.stream(options.valueOf(option).split(",")).filter(x -> !x.isEmpty());
}
private String bootstrapServers() {
return options.valueOf(bootstrapServerOpt);
}
private boolean hasCommandConfig() {
return options.has(commandConfigOpt);
}
private String commandConfig() {
return options.valueOf(commandConfigOpt);
}
private Set<String> topics() {
return splitAtCommasAndFilterOutEmpty(topicListOpt).collect(Collectors.toSet());
}
private Set<Integer> brokers() {
return splitAtCommasAndFilterOutEmpty(brokerListOpt).map(Integer::valueOf).collect(Collectors.toSet());
}
}
}

114
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java

@ -0,0 +1,114 @@ @@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class LogDirsCommandTest {
@Test
public void shouldThrowWhenQueryingNonExistentBrokers() {
Node broker = new Node(1, "hostname", 9092);
try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient));
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException {
Node broker = new Node(1, "hostname", 9092);
try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) {
String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient);
String[] standardOutputLines = standardOutput.split("\n");
assertEquals(3, standardOutputLines.length);
Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
List<Object> brokersInformation = (List<Object>) information.get("brokers");
Integer brokerId = (Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker");
assertEquals(1, brokersInformation.size());
assertEquals(1, brokerId);
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldQueryAllBrokersIfNonSpecified() throws JsonProcessingException {
Node brokerOne = new Node(1, "hostname", 9092);
Node brokerTwo = new Node(2, "hostname", 9092);
try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerTwo, brokerOne), brokerOne)) {
String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient);
String[] standardOutputLines = standardOutput.split("\n");
assertEquals(3, standardOutputLines.length);
Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
List<Object> brokersInformation = (List<Object>) information.get("brokers");
Set<Integer> brokerIds = new HashSet<Integer>() {{
add((Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker"));
add((Integer) ((HashMap<String, Object>) brokersInformation.get(1)).get("broker"));
}};
assertEquals(2, brokersInformation.size());
assertEquals(new HashSet<>(Arrays.asList(2, 1)), brokerIds);
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldQuerySpecifiedBroker() throws JsonProcessingException {
Node brokerOne = new Node(1, "hostname", 9092);
Node brokerTwo = new Node(2, "hostname", 9092);
try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) {
String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1", "--describe"), adminClient);
String[] standardOutputLines = standardOutput.split("\n");
assertEquals(3, standardOutputLines.length);
Map<String, Object> information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class);
List<Object> brokersInformation = (List<Object>) information.get("brokers");
Integer brokerId = (Integer) ((HashMap<String, Object>) brokersInformation.get(0)).get("broker");
assertEquals(1, brokersInformation.size());
assertEquals(1, brokerId);
}
}
private LogDirsCommand.LogDirsCommandOptions fromArgsToOptions(String... args) {
return new LogDirsCommand.LogDirsCommandOptions(args);
}
private String execute(LogDirsCommand.LogDirsCommandOptions options, Admin adminClient) {
Runnable runnable = () -> {
try {
LogDirsCommand.execute(options, adminClient);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
return ToolsTestUtils.captureStandardOut(runnable);
}
}
Loading…
Cancel
Save