diff --git a/bin/kafka-log-dirs.sh b/bin/kafka-log-dirs.sh index dc16edcc7c5..9894d695cc4 100755 --- a/bin/kafka-log-dirs.sh +++ b/bin/kafka-log-dirs.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LogDirsCommand "$@" diff --git a/bin/windows/kafka-log-dirs.bat b/bin/windows/kafka-log-dirs.bat index b490d47feae..850003c6038 100644 --- a/bin/windows/kafka-log-dirs.bat +++ b/bin/windows/kafka-log-dirs.bat @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. rem See the License for the specific language governing permissions and rem limitations under the License. -"%~dp0kafka-run-class.bat" kafka.admin.LogDirsCommand %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LogDirsCommand %* diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 7a32bcf5a16..7da22cc787d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -945,7 +945,37 @@ public class MockAdminClient extends AdminClient { @Override synchronized public DescribeLogDirsResult describeLogDirs(Collection brokers, DescribeLogDirsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + Map> unwrappedResults = new HashMap<>(); + + for (Integer broker : brokers) { + unwrappedResults.putIfAbsent(broker, new HashMap<>()); + } + + for (Map.Entry entry : allTopics.entrySet()) { + String topicName = entry.getKey(); + TopicMetadata topicMetadata = entry.getValue(); + // For tests, we make the assumption that there will always be only 1 entry. + List partitionLogDirs = topicMetadata.partitionLogDirs; + List topicPartitionInfos = topicMetadata.partitions; + for (TopicPartitionInfo topicPartitionInfo : topicPartitionInfos) { + List nodes = topicPartitionInfo.replicas(); + for (Node node : nodes) { + Map logDirDescriptionMap = unwrappedResults.get(node.id()); + LogDirDescription logDirDescription = logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new LogDirDescription(null, new HashMap<>())); + logDirDescription.replicaInfos().put(new TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 0, false)); + } + } + } + + Map>> results = new HashMap<>(); + + for (Map.Entry> entry : unwrappedResults.entrySet()) { + KafkaFutureImpl> kafkaFuture = new KafkaFutureImpl<>(); + kafkaFuture.complete(entry.getValue()); + results.put(entry.getKey(), kafkaFuture); + } + + return new DescribeLogDirsResult(results); } @Override diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala deleted file mode 100644 index 870e6a17ba1..00000000000 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin -import java.io.PrintStream -import java.util.Properties -import kafka.utils.Json -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} - -import scala.jdk.CollectionConverters._ -import scala.collection.Map - -/** - * A command for querying log directory usage on the specified brokers - */ -object LogDirsCommand { - - def main(args: Array[String]): Unit = { - describe(args, System.out) - } - - def describe(args: Array[String], out: PrintStream): Unit = { - val opts = new LogDirsCommandOptions(args) - val adminClient = createAdminClient(opts) - try { - val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) - val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet - val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { - case Some(brokerListStr) => - val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet - (inputBrokers.intersect(clusterBrokers), inputBrokers.diff(clusterBrokers)) - case None => (clusterBrokers, Set.empty) - } - - if (nonExistingBrokers.nonEmpty) { - out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}." + - s" Current existent brokers: ${clusterBrokers.mkString(",")}") - } else { - out.println("Querying brokers for log directories information") - val describeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava) - val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } - - out.println(s"Received log directory information from brokers ${existingBrokers.mkString(",")}") - out.println(formatAsJson(logDirInfosByBroker, topicList.toSet)) - } - } finally { - adminClient.close() - } - } - - private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = { - Json.encodeAsString(Map( - "version" -> 1, - "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) => - Map( - "broker" -> broker, - "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) => - Map( - "logDir" -> logDir, - "error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull, - "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) => - topicSet.isEmpty || topicSet.contains(topicPartition.topic) - }.map { case (topicPartition, replicaInfo) => - Map( - "partition" -> topicPartition.toString, - "size" -> replicaInfo.size, - "offsetLag" -> replicaInfo.offsetLag, - "isFuture" -> replicaInfo.isFuture - ).asJava - }.asJava - ).asJava - }.asJava - ).asJava - }.asJava - ).asJava) - } - - private def createAdminClient(opts: LogDirsCommandOptions): Admin = { - val props = if (opts.options.has(opts.commandConfigOpt)) - Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) - else - new Properties() - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) - props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool") - Admin.create(props) - } - - class LogDirsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args){ - val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping") - .withRequiredArg - .describedAs("The server(s) to use for bootstrapping") - .ofType(classOf[String]) - val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") - .withRequiredArg - .describedAs("Admin client property file") - .ofType(classOf[String]) - val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.") - val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " + - "All topics will be queried if no topic list is specified") - .withRequiredArg - .describedAs("Topic list") - .defaultsTo("") - .ofType(classOf[String]) - val brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " + - "All brokers in the cluster will be queried if no broker list is specified") - .withRequiredArg - .describedAs("Broker list") - .ofType(classOf[String]) - - options = parser.parse(args : _*) - - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.") - - CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt) - } -} diff --git a/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala deleted file mode 100644 index 03e9c1785ac..00000000000 --- a/core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.io.{ByteArrayOutputStream, PrintStream} -import java.nio.charset.StandardCharsets - -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Test - -import scala.collection.Seq - -class LogDirsCommandTest extends KafkaServerTestHarness { - - def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(1, zkConnect) - .map(KafkaConfig.fromProps) - } - - @Test - def checkLogDirsCommandOutput(): Unit = { - val byteArrayOutputStream = new ByteArrayOutputStream - val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name()) - //input exist brokerList - LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0", "--describe"), printStream) - val existingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8) - val existingBrokersLineIter = existingBrokersContent.split("\n").iterator - - assertTrue(existingBrokersLineIter.hasNext) - assertTrue(existingBrokersLineIter.next().contains(s"Querying brokers for log directories information")) - - //input nonexistent brokerList - byteArrayOutputStream.reset() - LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,1,2", "--describe"), printStream) - val nonExistingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8) - val nonExistingBrokersLineIter = nonExistingBrokersContent.split("\n").iterator - - assertTrue(nonExistingBrokersLineIter.hasNext) - assertTrue(nonExistingBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0")) - - //input duplicate ids - byteArrayOutputStream.reset() - LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,0,1,2,2", "--describe"), printStream) - val duplicateBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8) - val duplicateBrokersLineIter = duplicateBrokersContent.split("\n").iterator - - assertTrue(duplicateBrokersLineIter.hasNext) - assertTrue(duplicateBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0")) - - //use all brokerList for current cluster - byteArrayOutputStream.reset() - LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--describe"), printStream) - val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8) - val allBrokersLineIter = allBrokersContent.split("\n").iterator - - assertTrue(allBrokersLineIter.hasNext) - assertTrue(allBrokersLineIter.next().contains(s"Querying brokers for log directories information")) - } -} diff --git a/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java b/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java new file mode 100644 index 00000000000..4e6f0d3397a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LogDirsCommand { + + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + private static void execute(String... args) throws Exception { + LogDirsCommandOptions options = new LogDirsCommandOptions(args); + try (Admin adminClient = createAdminClient(options)) { + execute(options, adminClient); + } + } + + static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception { + Set topics = options.topics(); + Set clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet()); + Set inputBrokers = options.brokers(); + Set existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers); + existingBrokers.retainAll(clusterBrokers); + Set nonExistingBrokers = new HashSet<>(inputBrokers); + nonExistingBrokers.removeAll(clusterBrokers); + + if (!nonExistingBrokers.isEmpty()) { + throw new TerseException( + String.format( + "ERROR: The given brokers do not exist from --broker-list: %s. Current existent brokers: %s", + commaDelimitedStringFromIntegerSet(nonExistingBrokers), + commaDelimitedStringFromIntegerSet(clusterBrokers))); + } else { + System.out.println("Querying brokers for log directories information"); + DescribeLogDirsResult describeLogDirsResult = adminClient.describeLogDirs(existingBrokers); + Map> logDirInfosByBroker = describeLogDirsResult.allDescriptions().get(); + + System.out.printf( + "Received log directory information from brokers %s%n", + commaDelimitedStringFromIntegerSet(existingBrokers)); + System.out.println(formatAsJson(logDirInfosByBroker, topics)); + } + } + + private static String commaDelimitedStringFromIntegerSet(Set set) { + return set.stream().map(String::valueOf).collect(Collectors.joining(",")); + } + + private static List> fromReplicasInfoToPrintableRepresentation(Map replicasInfo) { + return replicasInfo.entrySet().stream().map(entry -> { + TopicPartition topicPartition = entry.getKey(); + return new HashMap() {{ + put("partition", topicPartition.toString()); + put("size", entry.getValue().size()); + put("offsetLag", entry.getValue().offsetLag()); + put("isFuture", entry.getValue().isFuture()); + }}; + }).collect(Collectors.toList()); + } + + private static List> fromLogDirInfosToPrintableRepresentation(Map logDirInfos, Set topicSet) { + return logDirInfos.entrySet().stream().map(entry -> { + String logDir = entry.getKey(); + return new HashMap() {{ + put("logDir", logDir); + put("error", entry.getValue().error() != null ? entry.getValue().error().getClass().getName() : null); + put("partitions", fromReplicasInfoToPrintableRepresentation( + entry.getValue().replicaInfos().entrySet().stream().filter(entry -> { + TopicPartition topicPartition = entry.getKey(); + return topicSet.isEmpty() || topicSet.contains(topicPartition.topic()); + }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)) + )); + }}; + }).collect(Collectors.toList()); + } + + private static String formatAsJson(Map> logDirInfosByBroker, Set topicSet) throws JsonProcessingException { + return new ObjectMapper().writeValueAsString(new HashMap() {{ + put("version", 1); + put("brokers", logDirInfosByBroker.entrySet().stream().map(entry -> { + int broker = entry.getKey(); + Map logDirInfos = entry.getValue(); + return new HashMap() {{ + put("broker", broker); + put("logDirs", fromLogDirInfosToPrintableRepresentation(logDirInfos, topicSet)); + }}; + }).collect(Collectors.toList())); + }}); + } + + private static Admin createAdminClient(LogDirsCommandOptions options) throws IOException { + Properties props = new Properties(); + if (options.hasCommandConfig()) { + props.putAll(Utils.loadProps(options.commandConfig())); + } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, options.bootstrapServers()); + props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool"); + return Admin.create(props); + } + + // Visible for testing + static class LogDirsCommandOptions extends CommandDefaultOptions { + private final OptionSpec bootstrapServerOpt; + private final OptionSpec commandConfigOpt; + private final OptionSpecBuilder describeOpt; + private final OptionSpec topicListOpt; + private final OptionSpec brokerListOpt; + + public LogDirsCommandOptions(String... args) { + super(args); + + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping") + .withRequiredArg() + .describedAs("The server(s) to use for bootstrapping") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg() + .describedAs("Admin client property file") + .ofType(String.class); + describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers."); + topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " + + "All topics will be queried if no topic list is specified") + .withRequiredArg() + .describedAs("Topic list") + .defaultsTo("") + .ofType(String.class); + brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " + + "All brokers in the cluster will be queried if no broker list is specified") + .withRequiredArg() + .describedAs("Broker list") + .ofType(String.class) + .defaultsTo(""); + + options = parser.parse(args); + + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers."); + + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt); + } + + private Stream splitAtCommasAndFilterOutEmpty(OptionSpec option) { + return Arrays.stream(options.valueOf(option).split(",")).filter(x -> !x.isEmpty()); + } + + private String bootstrapServers() { + return options.valueOf(bootstrapServerOpt); + } + + private boolean hasCommandConfig() { + return options.has(commandConfigOpt); + } + + private String commandConfig() { + return options.valueOf(commandConfigOpt); + } + + private Set topics() { + return splitAtCommasAndFilterOutEmpty(topicListOpt).collect(Collectors.toSet()); + } + + private Set brokers() { + return splitAtCommasAndFilterOutEmpty(brokerListOpt).map(Integer::valueOf).collect(Collectors.toSet()); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java new file mode 100644 index 00000000000..4dfd36896c6 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.Node; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class LogDirsCommandTest { + + @Test + public void shouldThrowWhenQueryingNonExistentBrokers() { + Node broker = new Node(1, "hostname", 9092); + try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) { + assertThrows(RuntimeException.class, () -> execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "0,1,2", "--describe"), adminClient)); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldNotThrowWhenDuplicatedBrokers() throws JsonProcessingException { + Node broker = new Node(1, "hostname", 9092); + try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) { + String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1,1", "--describe"), adminClient); + String[] standardOutputLines = standardOutput.split("\n"); + assertEquals(3, standardOutputLines.length); + Map information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class); + List brokersInformation = (List) information.get("brokers"); + Integer brokerId = (Integer) ((HashMap) brokersInformation.get(0)).get("broker"); + assertEquals(1, brokersInformation.size()); + assertEquals(1, brokerId); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldQueryAllBrokersIfNonSpecified() throws JsonProcessingException { + Node brokerOne = new Node(1, "hostname", 9092); + Node brokerTwo = new Node(2, "hostname", 9092); + try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerTwo, brokerOne), brokerOne)) { + String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--describe"), adminClient); + String[] standardOutputLines = standardOutput.split("\n"); + assertEquals(3, standardOutputLines.length); + Map information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class); + List brokersInformation = (List) information.get("brokers"); + Set brokerIds = new HashSet() {{ + add((Integer) ((HashMap) brokersInformation.get(0)).get("broker")); + add((Integer) ((HashMap) brokersInformation.get(1)).get("broker")); + }}; + assertEquals(2, brokersInformation.size()); + assertEquals(new HashSet<>(Arrays.asList(2, 1)), brokerIds); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldQuerySpecifiedBroker() throws JsonProcessingException { + Node brokerOne = new Node(1, "hostname", 9092); + Node brokerTwo = new Node(2, "hostname", 9092); + try (MockAdminClient adminClient = new MockAdminClient(Arrays.asList(brokerOne, brokerTwo), brokerOne)) { + String standardOutput = execute(fromArgsToOptions("--bootstrap-server", "EMPTY", "--broker-list", "1", "--describe"), adminClient); + String[] standardOutputLines = standardOutput.split("\n"); + assertEquals(3, standardOutputLines.length); + Map information = new ObjectMapper().readValue(standardOutputLines[2], HashMap.class); + List brokersInformation = (List) information.get("brokers"); + Integer brokerId = (Integer) ((HashMap) brokersInformation.get(0)).get("broker"); + assertEquals(1, brokersInformation.size()); + assertEquals(1, brokerId); + } + } + + private LogDirsCommand.LogDirsCommandOptions fromArgsToOptions(String... args) { + return new LogDirsCommand.LogDirsCommandOptions(args); + } + + private String execute(LogDirsCommand.LogDirsCommandOptions options, Admin adminClient) { + Runnable runnable = () -> { + try { + LogDirsCommand.execute(options, adminClient); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return ToolsTestUtils.captureStandardOut(runnable); + } +}