From 88725669e7c4de153493c9c1ed91466280ee3d6c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 9 Dec 2022 09:22:58 -0800 Subject: [PATCH] MINOR: Move MetadataQuorumCommand from `core` to `tools` (#12951) `core` should only be used for legacy cli tools and tools that require access to `core` classes instead of communicating via the kafka protocol (typically by using the client classes). Summary of changes: 1. Convert the command implementation and tests to Java and move it to the `tools` module. 2. Introduce mechanism to capture stdout and stderr from tests. 3. Change `kafka-metadata-quorum.sh` to point to the new command class. 4. Adjusted the test classpath of the `tools` module so that it supports tests that rely on the `@ClusterTests` annotation. 5. Improved error handling when an exception different from `TerseFailure` is thrown. 6. Changed `ToolsUtils` to avoid usage of arrays in favor of `List`. Reviewers: dengziming --- bin/kafka-metadata-quorum.sh | 2 +- bin/windows/kafka-metatada-quorum.bat | 2 +- build.gradle | 7 +- checkstyle/import-control.xml | 1 + .../org/apache/kafka/common/utils/Utils.java | 14 +- .../apache/kafka/common/utils/UtilsTest.java | 7 + .../kafka/admin/MetadataQuorumCommand.scala | 172 --------------- .../admin/MetadataQuorumCommandTest.scala | 192 ----------------- .../apache/kafka/server/util/ToolsUtils.java | 19 +- .../kafka/tools/MetadataQuorumCommand.java | 195 ++++++++++++++++++ .../apache/kafka/tools/TerseException.java | 33 +++ .../kafka/tools/TransactionsCommand.java | 42 ++-- .../tools/MetadataQuorumCommandErrorTest.java | 48 +++++ .../tools/MetadataQuorumCommandTest.java | 161 +++++++++++++++ .../apache/kafka/tools/ToolsTestUtils.java | 51 +++++ .../kafka/tools/TransactionsCommandTest.java | 12 +- 16 files changed, 553 insertions(+), 405 deletions(-) delete mode 100644 core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala delete mode 100644 core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala create mode 100644 tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/TerseException.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java diff --git a/bin/kafka-metadata-quorum.sh b/bin/kafka-metadata-quorum.sh index 24bedbded1e..3b25c7d159b 100755 --- a/bin/kafka-metadata-quorum.sh +++ b/bin/kafka-metadata-quorum.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.MetadataQuorumCommand "$@" diff --git a/bin/windows/kafka-metatada-quorum.bat b/bin/windows/kafka-metatada-quorum.bat index 4ea8e3109f9..7942115422b 100644 --- a/bin/windows/kafka-metatada-quorum.bat +++ b/bin/windows/kafka-metatada-quorum.bat @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. rem See the License for the specific language governing permissions and rem limitations under the License. -"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.MetadataQuorumCommand %* diff --git a/build.gradle b/build.gradle index 142bf70f465..63d47e522fd 100644 --- a/build.gradle +++ b/build.gradle @@ -1768,10 +1768,15 @@ project(':tools') { implementation libs.jacksonJaxrsJsonProvider testImplementation project(':clients') - testImplementation libs.junitJupiter testImplementation project(':clients').sourceSets.test.output + testImplementation project(':core') + testImplementation project(':core').sourceSets.test.output + testImplementation project(':server-common') + testImplementation project(':server-common').sourceSets.test.output + testImplementation libs.junitJupiter testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc. testImplementation libs.mockitoJunitJupiter // supports MockitoExtension + testImplementation libs.bcpkix // required by the clients test module, but we have to specify it explicitly as gradle does not include the transitive test dependency automatically testRuntimeOnly libs.slf4jlog4j } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7a62b671f84..df9a2e9adfd 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -396,6 +396,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 42dcb60357b..a9c510bac3f 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -598,13 +598,25 @@ public final class Utils { */ public static String join(Collection collection, String separator) { Objects.requireNonNull(collection); + return mkString(collection.stream(), "", "", separator); + } + + /** + * Create a string representation of a stream surrounded by `begin` and `end` and joined by `separator`. + * + * @return The string representation. + */ + public static String mkString(Stream stream, String begin, String end, String separator) { + Objects.requireNonNull(stream); StringBuilder sb = new StringBuilder(); - Iterator iter = collection.iterator(); + sb.append(begin); + Iterator iter = stream.iterator(); while (iter.hasNext()) { sb.append(iter.next()); if (iter.hasNext()) sb.append(separator); } + sb.append(end); return sb.toString(); } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index fbb322a0945..d10fd37a71b 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -162,6 +162,13 @@ public class UtilsTest { assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ",")); } + @Test + public void testMkString() { + assertEquals("[]", Utils.mkString(Stream.empty(), "[", "]", ",")); + assertEquals("(1)", Utils.mkString(Stream.of("1"), "(", ")", ",")); + assertEquals("{1,2,3}", Utils.mkString(Stream.of(1, 2, 3), "{", "}", ",")); + } + @Test public void testAbs() { assertEquals(0, Utils.abs(Integer.MIN_VALUE)); diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala deleted file mode 100644 index c92988d97fa..00000000000 --- a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import kafka.tools.TerseFailure -import kafka.utils.Exit -import net.sourceforge.argparse4j.ArgumentParsers -import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue} -import net.sourceforge.argparse4j.inf.Subparsers -import org.apache.kafka.clients._ -import org.apache.kafka.clients.admin.{Admin, QuorumInfo} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable - -import java.io.File -import java.util.Properties -import scala.jdk.CollectionConverters._ - -/** - * A tool for describing quorum status - */ -object MetadataQuorumCommand { - - def main(args: Array[String]): Unit = { - val res = mainNoExit(args) - Exit.exit(res) - } - - def mainNoExit(args: Array[String]): Int = { - val parser = ArgumentParsers - .newArgumentParser("kafka-metadata-quorum") - .defaultHelp(true) - .description("This tool describes kraft metadata quorum status.") - parser - .addArgument("--bootstrap-server") - .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") - .required(true) - - parser - .addArgument("--command-config") - .`type`(fileType()) - .help("Property file containing configs to be passed to Admin Client.") - val subparsers = parser.addSubparsers().dest("command") - addDescribeParser(subparsers) - - var admin: Admin = null - try { - val namespace = parser.parseArgsOrFail(args) - val command = namespace.getString("command") - - val commandConfig = namespace.get[File]("command_config") - val props = if (commandConfig != null) { - if (!commandConfig.exists()) { - throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!") - } - Utils.loadProps(commandConfig.getPath) - } else { - new Properties() - } - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) - admin = Admin.create(props) - - if (command == "describe") { - if (namespace.getBoolean("status") && namespace.getBoolean("replication")) { - throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command") - } else if (namespace.getBoolean("replication")) { - handleDescribeReplication(admin) - } else if (namespace.getBoolean("status")) { - handleDescribeStatus(admin) - } else { - throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command") - } - } else { - throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported") - } - 0 - } catch { - case e: TerseFailure => - Console.err.println(e.getMessage) - 1 - } finally { - if (admin != null) { - admin.close() - } - } - } - - def addDescribeParser(subparsers: Subparsers): Unit = { - val describeParser = subparsers - .addParser("describe") - .help("Describe the metadata quorum info") - - val statusArgs = describeParser.addArgumentGroup("Status") - statusArgs - .addArgument("--status") - .help( - "A short summary of the quorum status and the other provides detailed information about the status of replication.") - .action(storeTrue()) - val replicationArgs = describeParser.addArgumentGroup("Replication") - replicationArgs - .addArgument("--replication") - .help("Detailed information about the status of replication") - .action(storeTrue()) - } - - private def handleDescribeReplication(admin: Admin): Unit = { - val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get - val leaderId = quorumInfo.leaderId - val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head - - def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] = - infos.map { info => - Array(info.replicaId, - info.logEndOffset, - leader.logEndOffset - info.logEndOffset, - info.lastFetchTimestamp.orElse(-1), - info.lastCaughtUpTimestamp.orElse(-1), - status - ).map(_.toString) - } - prettyPrintTable( - Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"), - (convertQuorumInfo(Seq(leader), "Leader") - ++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower") - ++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava, - scala.Console.out - ) - } - - private def handleDescribeStatus(admin: Admin): Unit = { - val clusterId = admin.describeCluster.clusterId.get - val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get - val leaderId = quorumInfo.leaderId - val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head - val maxLagFollower = quorumInfo.voters.asScala - .minBy(_.logEndOffset) - val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset - val maxFollowerLagTimeMs = - if (leader == maxLagFollower) { - 0 - } else if (leader.lastCaughtUpTimestamp.isPresent && maxLagFollower.lastCaughtUpTimestamp.isPresent) { - leader.lastCaughtUpTimestamp.getAsLong - maxLagFollower.lastCaughtUpTimestamp.getAsLong - } else { - -1 - } - println( - s"""|ClusterId: $clusterId - |LeaderId: ${quorumInfo.leaderId} - |LeaderEpoch: ${quorumInfo.leaderEpoch} - |HighWatermark: ${quorumInfo.highWatermark} - |MaxFollowerLag: $maxFollowerLag - |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs - |CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")} - |CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")} - |""".stripMargin - ) - } -} diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala deleted file mode 100644 index 24b6616cb1e..00000000000 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type} -import kafka.test.junit.ClusterTestExtensions -import kafka.utils.TestUtils -import org.apache.kafka.common.errors.UnsupportedVersionException -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} -import org.junit.jupiter.api.{Tag, Test} -import org.junit.jupiter.api.extension.ExtendWith - -import java.util.concurrent.ExecutionException - -@ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT) -@Tag("integration") -class MetadataQuorumCommandTest(cluster: ClusterInstance) { - - /** - * 1. The same number of broker controllers - * 2. More brokers than controllers - * 3. Fewer brokers than controllers - */ - @ClusterTests( - Array( - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) - )) - def testDescribeQuorumReplicationSuccessful(): Unit = { - cluster.waitForReadyBrokers() - val describeOutput = TestUtils.grabConsoleOutput( - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) - ) - - val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r - val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r - val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r - val outputs = describeOutput.split("\n").tail - if (cluster.config().clusterType() == Type.CO_KRAFT) { - assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length) - } else { - assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length) - } - // `matches` is not supported in scala 2.12, use `findFirstIn` instead. - assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty) - assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty)) - assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty)) - - if (cluster.config().clusterType() == Type.CO_KRAFT) { - assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty)) - } else { - assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty)) - } - } - - /** - * 1. The same number of broker controllers - * 2. More brokers than controllers - * 3. Fewer brokers than controllers - */ - @ClusterTests( - Array( - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) - )) - def testDescribeQuorumStatusSuccessful(): Unit = { - cluster.waitForReadyBrokers() - val describeOutput = TestUtils.grabConsoleOutput( - MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) - ) - val outputs = describeOutput.split("\n") - - assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty) - assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty) - assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty) - // HighWatermark may be -1 - assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty) - assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty) - assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty) - assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty) - - // There are no observers if we have fewer brokers than controllers - if (cluster.config().clusterType() == Type.CO_KRAFT - && cluster.config().numBrokers() <= cluster.config().numControllers()) { - assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty) - } else { - assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty) - } - } - - @ClusterTests( - Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1), - new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1))) - def testOnlyOneBrokerAndOneController(): Unit = { - val statusOutput = TestUtils.grabConsoleOutput( - MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) - ) - assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")(4)) - assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")(5)) - - val replicationOutput = TestUtils.grabConsoleOutput( - MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) - ) - assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2)) - } - - @ClusterTest(clusterType = Type.ZK, brokers = 3) - def testDescribeQuorumInZkMode(): Unit = { - assertTrue( - assertThrows( - classOf[ExecutionException], - () => - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) - ).getCause.isInstanceOf[UnsupportedVersionException] - ) - assertTrue( - assertThrows( - classOf[ExecutionException], - () => - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) - ).getCause.isInstanceOf[UnsupportedVersionException] - ) - } -} - -class MetadataQuorumCommandErrorTest { - - @Test - def testPropertiesFileDoesNotExists(): Unit = { - assertEquals(1, - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))) - assertEquals( - "Properties file admin.properties does not exists!", - TestUtils - .grabConsoleError( - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))) - .trim - ) - } - - @Test - def testDescribeOptions(): Unit = { - assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe"))) - assertEquals( - "One of --status or --replication must be specified with describe sub-command", - TestUtils - .grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe"))) - .trim - ) - - assertEquals(1, - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))) - assertEquals( - "Only one of --status or --replication should be specified with describe sub-command", - TestUtils - .grabConsoleError( - MetadataQuorumCommand.mainNoExit( - Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))) - .trim - ) - } -} diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java index 0c923cd66c1..b14f079b15c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import java.io.PrintStream; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -70,13 +69,13 @@ public class ToolsUtils { private static void printRow( List columnLengths, - String[] row, + List row, PrintStream out ) { StringBuilder rowBuilder = new StringBuilder(); - for (int i = 0; i < row.length; i++) { + for (int i = 0; i < row.size(); i++) { Integer columnLength = columnLengths.get(i); - String columnValue = row[i]; + String columnValue = row.get(i); appendColumnValue(rowBuilder, columnValue, columnLength); rowBuilder.append('\t'); } @@ -84,17 +83,17 @@ public class ToolsUtils { } public static void prettyPrintTable( - String[] headers, - List rows, + List headers, + List> rows, PrintStream out ) { - List columnLengths = Arrays.stream(headers) + List columnLengths = headers.stream() .map(String::length) .collect(Collectors.toList()); - for (String[] row : rows) { - for (int i = 0; i < headers.length; i++) { - columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); + for (List row : rows) { + for (int i = 0; i < headers.size(); i++) { + columnLengths.set(i, Math.max(columnLengths.get(i), row.get(i).length())); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java new file mode 100644 index 00000000000..6cafea81b52 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.QuorumInfo; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; + +/** + * A tool for describing quorum status + */ +public class MetadataQuorumCommand { + + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("kafka-metadata-quorum") + .defaultHelp(true) + .description("This tool describes kraft metadata quorum status."); + parser + .addArgument("--bootstrap-server") + .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") + .required(true); + + parser + .addArgument("--command-config") + .type(Arguments.fileType()) + .help("Property file containing configs to be passed to Admin Client."); + Subparsers subparsers = parser.addSubparsers().dest("command"); + addDescribeParser(subparsers); + + Admin admin = null; + try { + Namespace namespace = parser.parseArgsOrFail(args); + String command = namespace.getString("command"); + + File commandConfig = namespace.get("command_config"); + Properties props = new Properties(); + if (commandConfig != null) { + if (!commandConfig.exists()) + throw new TerseException("Properties file " + commandConfig.getPath() + " does not exists!"); + + Utils.loadProps(commandConfig.getPath()); + } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")); + admin = Admin.create(props); + + if (command.equals("describe")) { + if (namespace.getBoolean("status") && namespace.getBoolean("replication")) { + throw new TerseException("Only one of --status or --replication should be specified with describe sub-command"); + } else if (namespace.getBoolean("replication")) { + handleDescribeReplication(admin); + } else if (namespace.getBoolean("status")) { + handleDescribeStatus(admin); + } else { + throw new TerseException("One of --status or --replication must be specified with describe sub-command"); + } + } else { + throw new IllegalStateException("Unknown command: " + command + ", only 'describe' is supported"); + } + } finally { + if (admin != null) + admin.close(); + } + } + + private static void addDescribeParser(Subparsers subparsers) { + Subparser describeParser = subparsers + .addParser("describe") + .help("Describe the metadata quorum info"); + + ArgumentGroup statusArgs = describeParser.addArgumentGroup("Status"); + statusArgs + .addArgument("--status") + .help("A short summary of the quorum status and the other provides detailed information about the status of replication.") + .action(Arguments.storeTrue()); + ArgumentGroup replicationArgs = describeParser.addArgumentGroup("Replication"); + replicationArgs + .addArgument("--replication") + .help("Detailed information about the status of replication") + .action(Arguments.storeTrue()); + } + + private static void handleDescribeReplication(Admin admin) throws ExecutionException, InterruptedException { + QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); + int leaderId = quorumInfo.leaderId(); + QuorumInfo.ReplicaState leader = quorumInfo.voters().stream().filter(voter -> voter.replicaId() == leaderId).findFirst().get(); + + List> rows = new ArrayList<>(); + rows.addAll(quorumInfoToRows(leader, Stream.of(leader), "Leader")); + rows.addAll(quorumInfoToRows(leader, quorumInfo.voters().stream().filter(v -> v.replicaId() != leaderId), "Follower")); + rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(), "Observer")); + + ToolsUtils.prettyPrintTable( + asList("NodeId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"), + rows, + System.out + ); + } + + private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, Stream infos, String status) { + return infos.map(info -> + Stream.of( + info.replicaId(), + info.logEndOffset(), + leader.logEndOffset() - info.logEndOffset(), + info.lastFetchTimestamp().orElse(-1), + info.lastCaughtUpTimestamp().orElse(-1), + status + ).map(r -> r.toString()).collect(Collectors.toList()) + ).collect(Collectors.toList()); + } + + private static void handleDescribeStatus(Admin admin) throws ExecutionException, InterruptedException { + String clusterId = admin.describeCluster().clusterId().get(); + QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); + int leaderId = quorumInfo.leaderId(); + QuorumInfo.ReplicaState leader = quorumInfo.voters().stream().filter(voter -> voter.replicaId() == leaderId).findFirst().get(); + QuorumInfo.ReplicaState maxLagFollower = quorumInfo.voters().stream().min(Comparator.comparingLong(qi -> qi.logEndOffset())).get(); + long maxFollowerLag = leader.logEndOffset() - maxLagFollower.logEndOffset(); + + long maxFollowerLagTimeMs; + if (leader == maxLagFollower) + maxFollowerLagTimeMs = 0; + else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaughtUpTimestamp().isPresent()) { + maxFollowerLagTimeMs = leader.lastCaughtUpTimestamp().getAsLong() - maxLagFollower.lastCaughtUpTimestamp().getAsLong(); + } else { + maxFollowerLagTimeMs = -1; + } + + System.out.println( + "ClusterId: " + clusterId + + "\nLeaderId: " + quorumInfo.leaderId() + + "\nLeaderEpoch: " + quorumInfo.leaderEpoch() + + "\nHighWatermark: " + quorumInfo.highWatermark() + + "\nMaxFollowerLag: " + maxFollowerLag + + "\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + + "\nCurrentVoters: " + Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", ",") + + "\nCurrentObservers: " + Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", "]", ",") + ); + } + +} diff --git a/tools/src/main/java/org/apache/kafka/tools/TerseException.java b/tools/src/main/java/org/apache/kafka/tools/TerseException.java new file mode 100644 index 00000000000..c43f5c62a58 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/TerseException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +/** + * An exception thrown to indicate that the command has failed, but we don't want to + * print a stack trace. + */ +public class TerseException extends Exception { + + /** + * Create new instance with the provided message. + * + * @param message The message to print out before exiting. A stack trace will not be printed. + */ + public TerseException(String message) { + super(message); + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index 194524d2654..4f7fa9c27bf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -63,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -254,14 +254,14 @@ public abstract class TransactionsCommand { } static class DescribeProducersCommand extends TransactionsCommand { - static final String[] HEADERS = new String[]{ + static final List HEADERS = asList( "ProducerId", "ProducerEpoch", "LatestCoordinatorEpoch", "LastSequence", "LastTimestamp", "CurrentTransactionStartOffset" - }; + ); DescribeProducersCommand(Time time) { super(time); @@ -320,20 +320,20 @@ public abstract class TransactionsCommand { return; } - List rows = result.activeProducers().stream().map(producerState -> { + List> rows = result.activeProducers().stream().map(producerState -> { String currentTransactionStartOffsetColumnValue = producerState.currentTransactionStartOffset().isPresent() ? String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) : "None"; - return new String[] { + return asList( String.valueOf(producerState.producerId()), String.valueOf(producerState.producerEpoch()), String.valueOf(producerState.coordinatorEpoch().orElse(-1)), String.valueOf(producerState.lastSequence()), String.valueOf(producerState.lastTimestamp()), currentTransactionStartOffsetColumnValue - }; + ); }).collect(Collectors.toList()); prettyPrintTable(HEADERS, rows, out); @@ -341,7 +341,7 @@ public abstract class TransactionsCommand { } static class DescribeTransactionsCommand extends TransactionsCommand { - static final String[] HEADERS = new String[]{ + static final List HEADERS = asList( "CoordinatorId", "TransactionalId", "ProducerId", @@ -351,7 +351,7 @@ public abstract class TransactionsCommand { "CurrentTransactionStartTimeMs", "TransactionDurationMs", "TopicPartitions" - }; + ); DescribeTransactionsCommand(Time time) { super(time); @@ -402,7 +402,7 @@ public abstract class TransactionsCommand { transactionDurationMsColumnValue = "None"; } - String[] row = new String[]{ + List row = asList( String.valueOf(result.coordinatorId()), transactionalId, String.valueOf(result.producerId()), @@ -412,19 +412,19 @@ public abstract class TransactionsCommand { transactionStartTimeMsColumnValue, transactionDurationMsColumnValue, Utils.join(result.topicPartitions(), ",") - }; + ); prettyPrintTable(HEADERS, singletonList(row), out); } } static class ListTransactionsCommand extends TransactionsCommand { - static final String[] HEADERS = new String[] { + static final List HEADERS = asList( "TransactionalId", "Coordinator", "ProducerId", "TransactionState" - }; + ); ListTransactionsCommand(Time time) { super(time); @@ -454,18 +454,18 @@ public abstract class TransactionsCommand { return; } - List rows = new ArrayList<>(); + List> rows = new ArrayList<>(); for (Map.Entry> brokerListingsEntry : result.entrySet()) { String coordinatorIdString = brokerListingsEntry.getKey().toString(); Collection listings = brokerListingsEntry.getValue(); for (TransactionListing listing : listings) { - rows.add(new String[] { + rows.add(asList( listing.transactionalId(), coordinatorIdString, String.valueOf(listing.producerId()), listing.state().toString() - }); + )); } } @@ -476,7 +476,7 @@ public abstract class TransactionsCommand { static class FindHangingTransactionsCommand extends TransactionsCommand { private static final int MAX_BATCH_SIZE = 500; - static final String[] HEADERS = new String[] { + static final List HEADERS = asList( "Topic", "Partition", "ProducerId", @@ -485,7 +485,7 @@ public abstract class TransactionsCommand { "StartOffset", "LastTimestamp", "Duration(min)" - }; + ); FindHangingTransactionsCommand(Time time) { super(time); @@ -653,13 +653,13 @@ public abstract class TransactionsCommand { PrintStream out ) { long currentTimeMs = time.milliseconds(); - List rows = new ArrayList<>(hangingTransactions.size()); + List> rows = new ArrayList<>(hangingTransactions.size()); for (OpenTransaction transaction : hangingTransactions) { long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes( currentTimeMs - transaction.producerState.lastTimestamp()); - rows.add(new String[] { + rows.add(asList( transaction.topicPartition.topic(), String.valueOf(transaction.topicPartition.partition()), String.valueOf(transaction.producerState.producerId()), @@ -668,7 +668,7 @@ public abstract class TransactionsCommand { String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)), String.valueOf(transaction.producerState.lastTimestamp()), String.valueOf(transactionDurationMinutes) - }); + )); } prettyPrintTable(HEADERS, rows, out); @@ -974,7 +974,7 @@ public abstract class TransactionsCommand { PrintStream out, Time time ) throws Exception { - List commands = Arrays.asList( + List commands = asList( new ListTransactionsCommand(time), new DescribeTransactionsCommand(time), new DescribeProducersCommand(time), diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java new file mode 100644 index 00000000000..8503e707b7b --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MetadataQuorumCommandErrorTest { + + @Test + public void testPropertiesFileDoesNotExists() { + assertEquals(1, + MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")); + assertEquals("Properties file admin.properties does not exists!", + ToolsTestUtils.captureStandardErr(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))); + } + + @Test + public void testDescribeOptions() { + assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe")); + assertEquals("One of --status or --replication must be specified with describe sub-command", + ToolsTestUtils.captureStandardErr(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe"))); + + assertEquals(1, + MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")); + assertEquals("Only one of --status or --replication should be specified with describe sub-command", + ToolsTestUtils.captureStandardErr(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))); + } + +} diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java new file mode 100644 index 00000000000..7b52b351afd --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.ClusterTests; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import static java.util.Arrays.stream; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.KRAFT) +@Tag("integration") +class MetadataQuorumCommandTest { + + private final ClusterInstance cluster; + public MetadataQuorumCommandTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + /** + * 1. The same number of broker controllers + * 2. More brokers than controllers + * 3. Fewer brokers than controllers + */ + @ClusterTests({ + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + @ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), + @ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), + @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) + }) + public void testDescribeQuorumReplicationSuccessful() throws InterruptedException { + cluster.waitForReadyBrokers(); + String describeOutput = ToolsTestUtils.captureStandardOut(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication") + ); + + List outputs = stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList()); + if (cluster.config().clusterType() == Type.CO_KRAFT) + assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.size()); + else + assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.size()); + + Pattern leaderPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+Leader\\s*"); + assertTrue(leaderPattern.matcher(outputs.get(0)).find()); + assertTrue(outputs.stream().skip(1).noneMatch(o -> leaderPattern.matcher(o).find())); + + Pattern followerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+Follower\\s*"); + assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count()); + + Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+Observer\\s*"); + if (cluster.config().clusterType() == Type.CO_KRAFT) + assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), + outputs.stream().filter(o -> observerPattern.matcher(o).find()).count()); + else + assertEquals(cluster.config().numBrokers(), outputs.stream().filter(o -> observerPattern.matcher(o).find()).count()); + } + + + /** + * 1. The same number of broker controllers + * 2. More brokers than controllers + * 3. Fewer brokers than controllers + */ + @ClusterTests({ + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + @ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), + @ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), + @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) + }) + public void testDescribeQuorumStatusSuccessful() throws InterruptedException { + cluster.waitForReadyBrokers(); + String describeOutput = ToolsTestUtils.captureStandardOut(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status") + ); + String[] outputs = describeOutput.split("\n"); + + assertTrue(outputs[0].matches("ClusterId:\\s+\\S{22}")); + assertTrue(outputs[1].matches("LeaderId:\\s+\\d+")); + assertTrue(outputs[2].matches("LeaderEpoch:\\s+\\d+")); + // HighWatermark may be -1 + assertTrue(outputs[3].matches("HighWatermark:\\s+-?\\d+")); + assertTrue(outputs[4].matches("MaxFollowerLag:\\s+\\d+")); + assertTrue(outputs[5].matches("MaxFollowerLagTimeMs:\\s+-?\\d+")); + assertTrue(outputs[6].matches("CurrentVoters:\\s+\\[\\d+(,\\d+)*]")); + + // There are no observers if we have fewer brokers than controllers + if (cluster.config().clusterType() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers()) + assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]")); + else + assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[\\d+(,\\d+)*]")); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1), + @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1) + }) + public void testOnlyOneBrokerAndOneController() { + String statusOutput = ToolsTestUtils.captureStandardOut(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status") + ); + assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")[4]); + assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")[5]); + + String replicationOutput = ToolsTestUtils.captureStandardOut(() -> + MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication") + ); + assertEquals("0", replicationOutput.split("\n")[1].split("\\s+")[2]); + } + + @ClusterTest(clusterType = Type.ZK, brokers = 3) + public void testDescribeQuorumInZkMode() { + assertTrue( + assertThrows( + ExecutionException.class, + () -> MetadataQuorumCommand.execute("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status") + ).getCause() instanceof UnsupportedVersionException + ); + + assertTrue( + assertThrows( + ExecutionException.class, + () -> MetadataQuorumCommand.execute("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication") + ).getCause() instanceof UnsupportedVersionException + ); + + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java new file mode 100644 index 00000000000..4664288ae4f --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +public class ToolsTestUtils { + + public static String captureStandardOut(Runnable runnable) { + return captureStandardStream(false, runnable); + } + + public static String captureStandardErr(Runnable runnable) { + return captureStandardStream(true, runnable); + } + + private static String captureStandardStream(boolean isErr, Runnable runnable) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintStream currentStream = isErr ? System.err : System.out; + PrintStream tempStream = new PrintStream(outputStream); + if (isErr) + System.setErr(tempStream); + else + System.setOut(tempStream); + try { + runnable.run(); + return new String(outputStream.toByteArray()).trim(); + } finally { + if (isErr) + System.setErr(currentStream); + else + System.setOut(currentStream); + } + } + +} diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java index 4b65dc87d3c..3dc77a6d0f6 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java @@ -177,7 +177,7 @@ public class TransactionsCommandTest { List> table = readOutputAsTable(); assertEquals(3, table.size()); - List expectedHeaders = asList(TransactionsCommand.DescribeProducersCommand.HEADERS); + List expectedHeaders = TransactionsCommand.DescribeProducersCommand.HEADERS; assertEquals(expectedHeaders, table.get(0)); Set> expectedRows = Utils.mkSet( @@ -213,7 +213,7 @@ public class TransactionsCommandTest { assertEquals(4, table.size()); // Assert expected headers - List expectedHeaders = asList(TransactionsCommand.ListTransactionsCommand.HEADERS); + List expectedHeaders = TransactionsCommand.ListTransactionsCommand.HEADERS; assertEquals(expectedHeaders, table.get(0)); Set> expectedRows = Utils.mkSet( @@ -272,7 +272,7 @@ public class TransactionsCommandTest { List> table = readOutputAsTable(); assertEquals(2, table.size()); - List expectedHeaders = asList(TransactionsCommand.DescribeTransactionsCommand.HEADERS); + List expectedHeaders = TransactionsCommand.DescribeTransactionsCommand.HEADERS; assertEquals(expectedHeaders, table.get(0)); List expectedRow = asList( @@ -703,7 +703,7 @@ public class TransactionsCommandTest { List> table = readOutputAsTable(); assertEquals(1, table.size()); - List expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); + List expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS; assertEquals(expectedHeaders, table.get(0)); } @@ -742,7 +742,7 @@ public class TransactionsCommandTest { List> table = readOutputAsTable(); assertEquals(1, table.size()); - List expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); + List expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS; assertEquals(expectedHeaders, table.get(0)); } @@ -940,7 +940,7 @@ public class TransactionsCommandTest { List> table = readOutputAsTable(); assertEquals(2, table.size()); - List expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); + List expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS; assertEquals(expectedHeaders, table.get(0)); long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(time.milliseconds() - lastTimestamp);