Browse Source

MINOR: Move MetadataQuorumCommand from `core` to `tools` (#12951)

`core` should only be  used for legacy cli tools and tools that require
access to `core` classes instead of communicating via the kafka protocol
(typically by using the client classes).

Summary of changes:
1. Convert the command implementation and tests to Java and move it to
    the `tools` module.
2. Introduce mechanism to capture stdout and stderr from tests.
3. Change `kafka-metadata-quorum.sh` to point to the new command class.
4. Adjusted the test classpath of the `tools` module so that it supports tests
    that rely on the `@ClusterTests` annotation.
5. Improved error handling when an exception different from `TerseFailure` is
    thrown.
6. Changed `ToolsUtils` to avoid usage of arrays in favor of `List`.

Reviewers: dengziming <dengziming1993@gmail.com>
pull/12974/head
Ismael Juma 2 years ago committed by GitHub
parent
commit
88725669e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-metadata-quorum.sh
  2. 2
      bin/windows/kafka-metatada-quorum.bat
  3. 7
      build.gradle
  4. 1
      checkstyle/import-control.xml
  5. 14
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  6. 7
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  7. 172
      core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
  8. 192
      core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala
  9. 19
      server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java
  10. 195
      tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
  11. 33
      tools/src/main/java/org/apache/kafka/tools/TerseException.java
  12. 42
      tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
  13. 48
      tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java
  14. 161
      tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
  15. 51
      tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
  16. 12
      tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java

2
bin/kafka-metadata-quorum.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@" exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.MetadataQuorumCommand "$@"

2
bin/windows/kafka-metatada-quorum.bat

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %* "%~dp0kafka-run-class.bat" org.apache.kafka.tools.MetadataQuorumCommand %*

7
build.gradle

@ -1768,10 +1768,15 @@ project(':tools') {
implementation libs.jacksonJaxrsJsonProvider implementation libs.jacksonJaxrsJsonProvider
testImplementation project(':clients') testImplementation project(':clients')
testImplementation libs.junitJupiter
testImplementation project(':clients').sourceSets.test.output testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc. testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation libs.bcpkix // required by the clients test module, but we have to specify it explicitly as gradle does not include the transitive test dependency automatically
testRuntimeOnly libs.slf4jlog4j testRuntimeOnly libs.slf4jlog4j
} }

1
checkstyle/import-control.xml

@ -396,6 +396,7 @@
<allow pkg="org.jose4j" /> <allow pkg="org.jose4j" />
<allow pkg="net.sourceforge.argparse4j" /> <allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.log4j" /> <allow pkg="org.apache.log4j" />
<allow pkg="kafka.test" />
</subpackage> </subpackage>
<subpackage name="trogdor"> <subpackage name="trogdor">

14
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -598,13 +598,25 @@ public final class Utils {
*/ */
public static <T> String join(Collection<T> collection, String separator) { public static <T> String join(Collection<T> collection, String separator) {
Objects.requireNonNull(collection); Objects.requireNonNull(collection);
return mkString(collection.stream(), "", "", separator);
}
/**
* Create a string representation of a stream surrounded by `begin` and `end` and joined by `separator`.
*
* @return The string representation.
*/
public static <T> String mkString(Stream<T> stream, String begin, String end, String separator) {
Objects.requireNonNull(stream);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Iterator<T> iter = collection.iterator(); sb.append(begin);
Iterator<T> iter = stream.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
sb.append(iter.next()); sb.append(iter.next());
if (iter.hasNext()) if (iter.hasNext())
sb.append(separator); sb.append(separator);
} }
sb.append(end);
return sb.toString(); return sb.toString();
} }

7
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -162,6 +162,13 @@ public class UtilsTest {
assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ",")); assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ","));
} }
@Test
public void testMkString() {
assertEquals("[]", Utils.mkString(Stream.empty(), "[", "]", ","));
assertEquals("(1)", Utils.mkString(Stream.of("1"), "(", ")", ","));
assertEquals("{1,2,3}", Utils.mkString(Stream.of(1, 2, 3), "{", "}", ","));
}
@Test @Test
public void testAbs() { public void testAbs() {
assertEquals(0, Utils.abs(Integer.MIN_VALUE)); assertEquals(0, Utils.abs(Integer.MIN_VALUE));

172
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala

@ -1,172 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.tools.TerseFailure
import kafka.utils.Exit
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue}
import net.sourceforge.argparse4j.inf.Subparsers
import org.apache.kafka.clients._
import org.apache.kafka.clients.admin.{Admin, QuorumInfo}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable
import java.io.File
import java.util.Properties
import scala.jdk.CollectionConverters._
/**
* A tool for describing quorum status
*/
object MetadataQuorumCommand {
def main(args: Array[String]): Unit = {
val res = mainNoExit(args)
Exit.exit(res)
}
def mainNoExit(args: Array[String]): Int = {
val parser = ArgumentParsers
.newArgumentParser("kafka-metadata-quorum")
.defaultHelp(true)
.description("This tool describes kraft metadata quorum status.")
parser
.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
.required(true)
parser
.addArgument("--command-config")
.`type`(fileType())
.help("Property file containing configs to be passed to Admin Client.")
val subparsers = parser.addSubparsers().dest("command")
addDescribeParser(subparsers)
var admin: Admin = null
try {
val namespace = parser.parseArgsOrFail(args)
val command = namespace.getString("command")
val commandConfig = namespace.get[File]("command_config")
val props = if (commandConfig != null) {
if (!commandConfig.exists()) {
throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
}
Utils.loadProps(commandConfig.getPath)
} else {
new Properties()
}
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
admin = Admin.create(props)
if (command == "describe") {
if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command")
} else if (namespace.getBoolean("replication")) {
handleDescribeReplication(admin)
} else if (namespace.getBoolean("status")) {
handleDescribeStatus(admin)
} else {
throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command")
}
} else {
throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported")
}
0
} catch {
case e: TerseFailure =>
Console.err.println(e.getMessage)
1
} finally {
if (admin != null) {
admin.close()
}
}
}
def addDescribeParser(subparsers: Subparsers): Unit = {
val describeParser = subparsers
.addParser("describe")
.help("Describe the metadata quorum info")
val statusArgs = describeParser.addArgumentGroup("Status")
statusArgs
.addArgument("--status")
.help(
"A short summary of the quorum status and the other provides detailed information about the status of replication.")
.action(storeTrue())
val replicationArgs = describeParser.addArgumentGroup("Replication")
replicationArgs
.addArgument("--replication")
.help("Detailed information about the status of replication")
.action(storeTrue())
}
private def handleDescribeReplication(admin: Admin): Unit = {
val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
val leaderId = quorumInfo.leaderId
val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] =
infos.map { info =>
Array(info.replicaId,
info.logEndOffset,
leader.logEndOffset - info.logEndOffset,
info.lastFetchTimestamp.orElse(-1),
info.lastCaughtUpTimestamp.orElse(-1),
status
).map(_.toString)
}
prettyPrintTable(
Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
(convertQuorumInfo(Seq(leader), "Leader")
++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower")
++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava,
scala.Console.out
)
}
private def handleDescribeStatus(admin: Admin): Unit = {
val clusterId = admin.describeCluster.clusterId.get
val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get
val leaderId = quorumInfo.leaderId
val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head
val maxLagFollower = quorumInfo.voters.asScala
.minBy(_.logEndOffset)
val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset
val maxFollowerLagTimeMs =
if (leader == maxLagFollower) {
0
} else if (leader.lastCaughtUpTimestamp.isPresent && maxLagFollower.lastCaughtUpTimestamp.isPresent) {
leader.lastCaughtUpTimestamp.getAsLong - maxLagFollower.lastCaughtUpTimestamp.getAsLong
} else {
-1
}
println(
s"""|ClusterId: $clusterId
|LeaderId: ${quorumInfo.leaderId}
|LeaderEpoch: ${quorumInfo.leaderEpoch}
|HighWatermark: ${quorumInfo.highWatermark}
|MaxFollowerLag: $maxFollowerLag
|MaxFollowerLagTimeMs: $maxFollowerLagTimeMs
|CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
|CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
|""".stripMargin
)
}
}

192
core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala

@ -1,192 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.concurrent.ExecutionException
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
class MetadataQuorumCommandTest(cluster: ClusterInstance) {
/**
* 1. The same number of broker controllers
* 2. More brokers than controllers
* 3. Fewer brokers than controllers
*/
@ClusterTests(
Array(
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
))
def testDescribeQuorumReplicationSuccessful(): Unit = {
cluster.waitForReadyBrokers()
val describeOutput = TestUtils.grabConsoleOutput(
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
)
val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r
val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r
val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r
val outputs = describeOutput.split("\n").tail
if (cluster.config().clusterType() == Type.CO_KRAFT) {
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length)
} else {
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length)
}
// `matches` is not supported in scala 2.12, use `findFirstIn` instead.
assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty)
assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty))
assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty))
if (cluster.config().clusterType() == Type.CO_KRAFT) {
assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
} else {
assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty))
}
}
/**
* 1. The same number of broker controllers
* 2. More brokers than controllers
* 3. Fewer brokers than controllers
*/
@ClusterTests(
Array(
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
))
def testDescribeQuorumStatusSuccessful(): Unit = {
cluster.waitForReadyBrokers()
val describeOutput = TestUtils.grabConsoleOutput(
MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
)
val outputs = describeOutput.split("\n")
assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty)
assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty)
assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty)
// HighWatermark may be -1
assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty)
assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty)
assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty)
assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty)
// There are no observers if we have fewer brokers than controllers
if (cluster.config().clusterType() == Type.CO_KRAFT
&& cluster.config().numBrokers() <= cluster.config().numControllers()) {
assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty)
} else {
assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty)
}
}
@ClusterTests(
Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)))
def testOnlyOneBrokerAndOneController(): Unit = {
val statusOutput = TestUtils.grabConsoleOutput(
MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
)
assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")(4))
assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")(5))
val replicationOutput = TestUtils.grabConsoleOutput(
MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
)
assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2))
}
@ClusterTest(clusterType = Type.ZK, brokers = 3)
def testDescribeQuorumInZkMode(): Unit = {
assertTrue(
assertThrows(
classOf[ExecutionException],
() =>
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status"))
).getCause.isInstanceOf[UnsupportedVersionException]
)
assertTrue(
assertThrows(
classOf[ExecutionException],
() =>
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication"))
).getCause.isInstanceOf[UnsupportedVersionException]
)
}
}
class MetadataQuorumCommandErrorTest {
@Test
def testPropertiesFileDoesNotExists(): Unit = {
assertEquals(1,
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
assertEquals(
"Properties file admin.properties does not exists!",
TestUtils
.grabConsoleError(
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")))
.trim
)
}
@Test
def testDescribeOptions(): Unit = {
assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
assertEquals(
"One of --status or --replication must be specified with describe sub-command",
TestUtils
.grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe")))
.trim
)
assertEquals(1,
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
assertEquals(
"Only one of --status or --replication should be specified with describe sub-command",
TestUtils
.grabConsoleError(
MetadataQuorumCommand.mainNoExit(
Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")))
.trim
)
}
}

19
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java

@ -20,7 +20,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
@ -70,13 +69,13 @@ public class ToolsUtils {
private static void printRow( private static void printRow(
List<Integer> columnLengths, List<Integer> columnLengths,
String[] row, List<String> row,
PrintStream out PrintStream out
) { ) {
StringBuilder rowBuilder = new StringBuilder(); StringBuilder rowBuilder = new StringBuilder();
for (int i = 0; i < row.length; i++) { for (int i = 0; i < row.size(); i++) {
Integer columnLength = columnLengths.get(i); Integer columnLength = columnLengths.get(i);
String columnValue = row[i]; String columnValue = row.get(i);
appendColumnValue(rowBuilder, columnValue, columnLength); appendColumnValue(rowBuilder, columnValue, columnLength);
rowBuilder.append('\t'); rowBuilder.append('\t');
} }
@ -84,17 +83,17 @@ public class ToolsUtils {
} }
public static void prettyPrintTable( public static void prettyPrintTable(
String[] headers, List<String> headers,
List<String[]> rows, List<List<String>> rows,
PrintStream out PrintStream out
) { ) {
List<Integer> columnLengths = Arrays.stream(headers) List<Integer> columnLengths = headers.stream()
.map(String::length) .map(String::length)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (String[] row : rows) { for (List<String> row : rows) {
for (int i = 0; i < headers.length; i++) { for (int i = 0; i < headers.size(); i++) {
columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); columnLengths.set(i, Math.max(columnLengths.get(i), row.get(i).length()));
} }
} }

195
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.ToolsUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
/**
* A tool for describing quorum status
*/
public class MetadataQuorumCommand {
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (TerseException e) {
System.err.println(e.getMessage());
return 1;
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
static void execute(String... args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("kafka-metadata-quorum")
.defaultHelp(true)
.description("This tool describes kraft metadata quorum status.");
parser
.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
.required(true);
parser
.addArgument("--command-config")
.type(Arguments.fileType())
.help("Property file containing configs to be passed to Admin Client.");
Subparsers subparsers = parser.addSubparsers().dest("command");
addDescribeParser(subparsers);
Admin admin = null;
try {
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
File commandConfig = namespace.get("command_config");
Properties props = new Properties();
if (commandConfig != null) {
if (!commandConfig.exists())
throw new TerseException("Properties file " + commandConfig.getPath() + " does not exists!");
Utils.loadProps(commandConfig.getPath());
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"));
admin = Admin.create(props);
if (command.equals("describe")) {
if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
throw new TerseException("Only one of --status or --replication should be specified with describe sub-command");
} else if (namespace.getBoolean("replication")) {
handleDescribeReplication(admin);
} else if (namespace.getBoolean("status")) {
handleDescribeStatus(admin);
} else {
throw new TerseException("One of --status or --replication must be specified with describe sub-command");
}
} else {
throw new IllegalStateException("Unknown command: " + command + ", only 'describe' is supported");
}
} finally {
if (admin != null)
admin.close();
}
}
private static void addDescribeParser(Subparsers subparsers) {
Subparser describeParser = subparsers
.addParser("describe")
.help("Describe the metadata quorum info");
ArgumentGroup statusArgs = describeParser.addArgumentGroup("Status");
statusArgs
.addArgument("--status")
.help("A short summary of the quorum status and the other provides detailed information about the status of replication.")
.action(Arguments.storeTrue());
ArgumentGroup replicationArgs = describeParser.addArgumentGroup("Replication");
replicationArgs
.addArgument("--replication")
.help("Detailed information about the status of replication")
.action(Arguments.storeTrue());
}
private static void handleDescribeReplication(Admin admin) throws ExecutionException, InterruptedException {
QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
int leaderId = quorumInfo.leaderId();
QuorumInfo.ReplicaState leader = quorumInfo.voters().stream().filter(voter -> voter.replicaId() == leaderId).findFirst().get();
List<List<String>> rows = new ArrayList<>();
rows.addAll(quorumInfoToRows(leader, Stream.of(leader), "Leader"));
rows.addAll(quorumInfoToRows(leader, quorumInfo.voters().stream().filter(v -> v.replicaId() != leaderId), "Follower"));
rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(), "Observer"));
ToolsUtils.prettyPrintTable(
asList("NodeId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
rows,
System.out
);
}
private static List<List<String>> quorumInfoToRows(QuorumInfo.ReplicaState leader, Stream<QuorumInfo.ReplicaState> infos, String status) {
return infos.map(info ->
Stream.of(
info.replicaId(),
info.logEndOffset(),
leader.logEndOffset() - info.logEndOffset(),
info.lastFetchTimestamp().orElse(-1),
info.lastCaughtUpTimestamp().orElse(-1),
status
).map(r -> r.toString()).collect(Collectors.toList())
).collect(Collectors.toList());
}
private static void handleDescribeStatus(Admin admin) throws ExecutionException, InterruptedException {
String clusterId = admin.describeCluster().clusterId().get();
QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
int leaderId = quorumInfo.leaderId();
QuorumInfo.ReplicaState leader = quorumInfo.voters().stream().filter(voter -> voter.replicaId() == leaderId).findFirst().get();
QuorumInfo.ReplicaState maxLagFollower = quorumInfo.voters().stream().min(Comparator.comparingLong(qi -> qi.logEndOffset())).get();
long maxFollowerLag = leader.logEndOffset() - maxLagFollower.logEndOffset();
long maxFollowerLagTimeMs;
if (leader == maxLagFollower)
maxFollowerLagTimeMs = 0;
else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaughtUpTimestamp().isPresent()) {
maxFollowerLagTimeMs = leader.lastCaughtUpTimestamp().getAsLong() - maxLagFollower.lastCaughtUpTimestamp().getAsLong();
} else {
maxFollowerLagTimeMs = -1;
}
System.out.println(
"ClusterId: " + clusterId +
"\nLeaderId: " + quorumInfo.leaderId() +
"\nLeaderEpoch: " + quorumInfo.leaderEpoch() +
"\nHighWatermark: " + quorumInfo.highWatermark() +
"\nMaxFollowerLag: " + maxFollowerLag +
"\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs +
"\nCurrentVoters: " + Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", ",") +
"\nCurrentObservers: " + Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", "]", ",")
);
}
}

33
tools/src/main/java/org/apache/kafka/tools/TerseException.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
/**
* An exception thrown to indicate that the command has failed, but we don't want to
* print a stack trace.
*/
public class TerseException extends Exception {
/**
* Create new instance with the provided message.
*
* @param message The message to print out before exiting. A stack trace will not be printed.
*/
public TerseException(String message) {
super(message);
}
}

42
tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java

@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -63,6 +62,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -254,14 +254,14 @@ public abstract class TransactionsCommand {
} }
static class DescribeProducersCommand extends TransactionsCommand { static class DescribeProducersCommand extends TransactionsCommand {
static final String[] HEADERS = new String[]{ static final List<String> HEADERS = asList(
"ProducerId", "ProducerId",
"ProducerEpoch", "ProducerEpoch",
"LatestCoordinatorEpoch", "LatestCoordinatorEpoch",
"LastSequence", "LastSequence",
"LastTimestamp", "LastTimestamp",
"CurrentTransactionStartOffset" "CurrentTransactionStartOffset"
}; );
DescribeProducersCommand(Time time) { DescribeProducersCommand(Time time) {
super(time); super(time);
@ -320,20 +320,20 @@ public abstract class TransactionsCommand {
return; return;
} }
List<String[]> rows = result.activeProducers().stream().map(producerState -> { List<List<String>> rows = result.activeProducers().stream().map(producerState -> {
String currentTransactionStartOffsetColumnValue = String currentTransactionStartOffsetColumnValue =
producerState.currentTransactionStartOffset().isPresent() ? producerState.currentTransactionStartOffset().isPresent() ?
String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) : String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) :
"None"; "None";
return new String[] { return asList(
String.valueOf(producerState.producerId()), String.valueOf(producerState.producerId()),
String.valueOf(producerState.producerEpoch()), String.valueOf(producerState.producerEpoch()),
String.valueOf(producerState.coordinatorEpoch().orElse(-1)), String.valueOf(producerState.coordinatorEpoch().orElse(-1)),
String.valueOf(producerState.lastSequence()), String.valueOf(producerState.lastSequence()),
String.valueOf(producerState.lastTimestamp()), String.valueOf(producerState.lastTimestamp()),
currentTransactionStartOffsetColumnValue currentTransactionStartOffsetColumnValue
}; );
}).collect(Collectors.toList()); }).collect(Collectors.toList());
prettyPrintTable(HEADERS, rows, out); prettyPrintTable(HEADERS, rows, out);
@ -341,7 +341,7 @@ public abstract class TransactionsCommand {
} }
static class DescribeTransactionsCommand extends TransactionsCommand { static class DescribeTransactionsCommand extends TransactionsCommand {
static final String[] HEADERS = new String[]{ static final List<String> HEADERS = asList(
"CoordinatorId", "CoordinatorId",
"TransactionalId", "TransactionalId",
"ProducerId", "ProducerId",
@ -351,7 +351,7 @@ public abstract class TransactionsCommand {
"CurrentTransactionStartTimeMs", "CurrentTransactionStartTimeMs",
"TransactionDurationMs", "TransactionDurationMs",
"TopicPartitions" "TopicPartitions"
}; );
DescribeTransactionsCommand(Time time) { DescribeTransactionsCommand(Time time) {
super(time); super(time);
@ -402,7 +402,7 @@ public abstract class TransactionsCommand {
transactionDurationMsColumnValue = "None"; transactionDurationMsColumnValue = "None";
} }
String[] row = new String[]{ List<String> row = asList(
String.valueOf(result.coordinatorId()), String.valueOf(result.coordinatorId()),
transactionalId, transactionalId,
String.valueOf(result.producerId()), String.valueOf(result.producerId()),
@ -412,19 +412,19 @@ public abstract class TransactionsCommand {
transactionStartTimeMsColumnValue, transactionStartTimeMsColumnValue,
transactionDurationMsColumnValue, transactionDurationMsColumnValue,
Utils.join(result.topicPartitions(), ",") Utils.join(result.topicPartitions(), ",")
}; );
prettyPrintTable(HEADERS, singletonList(row), out); prettyPrintTable(HEADERS, singletonList(row), out);
} }
} }
static class ListTransactionsCommand extends TransactionsCommand { static class ListTransactionsCommand extends TransactionsCommand {
static final String[] HEADERS = new String[] { static final List<String> HEADERS = asList(
"TransactionalId", "TransactionalId",
"Coordinator", "Coordinator",
"ProducerId", "ProducerId",
"TransactionState" "TransactionState"
}; );
ListTransactionsCommand(Time time) { ListTransactionsCommand(Time time) {
super(time); super(time);
@ -454,18 +454,18 @@ public abstract class TransactionsCommand {
return; return;
} }
List<String[]> rows = new ArrayList<>(); List<List<String>> rows = new ArrayList<>();
for (Map.Entry<Integer, Collection<TransactionListing>> brokerListingsEntry : result.entrySet()) { for (Map.Entry<Integer, Collection<TransactionListing>> brokerListingsEntry : result.entrySet()) {
String coordinatorIdString = brokerListingsEntry.getKey().toString(); String coordinatorIdString = brokerListingsEntry.getKey().toString();
Collection<TransactionListing> listings = brokerListingsEntry.getValue(); Collection<TransactionListing> listings = brokerListingsEntry.getValue();
for (TransactionListing listing : listings) { for (TransactionListing listing : listings) {
rows.add(new String[] { rows.add(asList(
listing.transactionalId(), listing.transactionalId(),
coordinatorIdString, coordinatorIdString,
String.valueOf(listing.producerId()), String.valueOf(listing.producerId()),
listing.state().toString() listing.state().toString()
}); ));
} }
} }
@ -476,7 +476,7 @@ public abstract class TransactionsCommand {
static class FindHangingTransactionsCommand extends TransactionsCommand { static class FindHangingTransactionsCommand extends TransactionsCommand {
private static final int MAX_BATCH_SIZE = 500; private static final int MAX_BATCH_SIZE = 500;
static final String[] HEADERS = new String[] { static final List<String> HEADERS = asList(
"Topic", "Topic",
"Partition", "Partition",
"ProducerId", "ProducerId",
@ -485,7 +485,7 @@ public abstract class TransactionsCommand {
"StartOffset", "StartOffset",
"LastTimestamp", "LastTimestamp",
"Duration(min)" "Duration(min)"
}; );
FindHangingTransactionsCommand(Time time) { FindHangingTransactionsCommand(Time time) {
super(time); super(time);
@ -653,13 +653,13 @@ public abstract class TransactionsCommand {
PrintStream out PrintStream out
) { ) {
long currentTimeMs = time.milliseconds(); long currentTimeMs = time.milliseconds();
List<String[]> rows = new ArrayList<>(hangingTransactions.size()); List<List<String>> rows = new ArrayList<>(hangingTransactions.size());
for (OpenTransaction transaction : hangingTransactions) { for (OpenTransaction transaction : hangingTransactions) {
long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes( long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(
currentTimeMs - transaction.producerState.lastTimestamp()); currentTimeMs - transaction.producerState.lastTimestamp());
rows.add(new String[] { rows.add(asList(
transaction.topicPartition.topic(), transaction.topicPartition.topic(),
String.valueOf(transaction.topicPartition.partition()), String.valueOf(transaction.topicPartition.partition()),
String.valueOf(transaction.producerState.producerId()), String.valueOf(transaction.producerState.producerId()),
@ -668,7 +668,7 @@ public abstract class TransactionsCommand {
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)), String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
String.valueOf(transaction.producerState.lastTimestamp()), String.valueOf(transaction.producerState.lastTimestamp()),
String.valueOf(transactionDurationMinutes) String.valueOf(transactionDurationMinutes)
}); ));
} }
prettyPrintTable(HEADERS, rows, out); prettyPrintTable(HEADERS, rows, out);
@ -974,7 +974,7 @@ public abstract class TransactionsCommand {
PrintStream out, PrintStream out,
Time time Time time
) throws Exception { ) throws Exception {
List<TransactionsCommand> commands = Arrays.asList( List<TransactionsCommand> commands = asList(
new ListTransactionsCommand(time), new ListTransactionsCommand(time),
new DescribeTransactionsCommand(time), new DescribeTransactionsCommand(time),
new DescribeProducersCommand(time), new DescribeProducersCommand(time),

48
tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandErrorTest.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MetadataQuorumCommandErrorTest {
@Test
public void testPropertiesFileDoesNotExists() {
assertEquals(1,
MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"));
assertEquals("Properties file admin.properties does not exists!",
ToolsTestUtils.captureStandardErr(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")));
}
@Test
public void testDescribeOptions() {
assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe"));
assertEquals("One of --status or --replication must be specified with describe sub-command",
ToolsTestUtils.captureStandardErr(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe")));
assertEquals(1,
MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"));
assertEquals("Only one of --status or --replication should be specified with describe sub-command",
ToolsTestUtils.captureStandardErr(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication")));
}
}

161
tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static java.util.Arrays.stream;
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
class MetadataQuorumCommandTest {
private final ClusterInstance cluster;
public MetadataQuorumCommandTest(ClusterInstance cluster) {
this.cluster = cluster;
}
/**
* 1. The same number of broker controllers
* 2. More brokers than controllers
* 3. Fewer brokers than controllers
*/
@ClusterTests({
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
@ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
@ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
@ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
})
public void testDescribeQuorumReplicationSuccessful() throws InterruptedException {
cluster.waitForReadyBrokers();
String describeOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")
);
List<String> outputs = stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList());
if (cluster.config().clusterType() == Type.CO_KRAFT)
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.size());
else
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.size());
Pattern leaderPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+Leader\\s*");
assertTrue(leaderPattern.matcher(outputs.get(0)).find());
assertTrue(outputs.stream().skip(1).noneMatch(o -> leaderPattern.matcher(o).find()));
Pattern followerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+Follower\\s*");
assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count());
Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+Observer\\s*");
if (cluster.config().clusterType() == Type.CO_KRAFT)
assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()),
outputs.stream().filter(o -> observerPattern.matcher(o).find()).count());
else
assertEquals(cluster.config().numBrokers(), outputs.stream().filter(o -> observerPattern.matcher(o).find()).count());
}
/**
* 1. The same number of broker controllers
* 2. More brokers than controllers
* 3. Fewer brokers than controllers
*/
@ClusterTests({
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3),
@ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3),
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2),
@ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2),
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3),
@ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3)
})
public void testDescribeQuorumStatusSuccessful() throws InterruptedException {
cluster.waitForReadyBrokers();
String describeOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
);
String[] outputs = describeOutput.split("\n");
assertTrue(outputs[0].matches("ClusterId:\\s+\\S{22}"));
assertTrue(outputs[1].matches("LeaderId:\\s+\\d+"));
assertTrue(outputs[2].matches("LeaderEpoch:\\s+\\d+"));
// HighWatermark may be -1
assertTrue(outputs[3].matches("HighWatermark:\\s+-?\\d+"));
assertTrue(outputs[4].matches("MaxFollowerLag:\\s+\\d+"));
assertTrue(outputs[5].matches("MaxFollowerLagTimeMs:\\s+-?\\d+"));
assertTrue(outputs[6].matches("CurrentVoters:\\s+\\[\\d+(,\\d+)*]"));
// There are no observers if we have fewer brokers than controllers
if (cluster.config().clusterType() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers())
assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]"));
else
assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[\\d+(,\\d+)*]"));
}
@ClusterTests({
@ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
@ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)
})
public void testOnlyOneBrokerAndOneController() {
String statusOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
);
assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")[4]);
assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")[5]);
String replicationOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")
);
assertEquals("0", replicationOutput.split("\n")[1].split("\\s+")[2]);
}
@ClusterTest(clusterType = Type.ZK, brokers = 3)
public void testDescribeQuorumInZkMode() {
assertTrue(
assertThrows(
ExecutionException.class,
() -> MetadataQuorumCommand.execute("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
).getCause() instanceof UnsupportedVersionException
);
assertTrue(
assertThrows(
ExecutionException.class,
() -> MetadataQuorumCommand.execute("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")
).getCause() instanceof UnsupportedVersionException
);
}
}

51
tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
public class ToolsTestUtils {
public static String captureStandardOut(Runnable runnable) {
return captureStandardStream(false, runnable);
}
public static String captureStandardErr(Runnable runnable) {
return captureStandardStream(true, runnable);
}
private static String captureStandardStream(boolean isErr, Runnable runnable) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream currentStream = isErr ? System.err : System.out;
PrintStream tempStream = new PrintStream(outputStream);
if (isErr)
System.setErr(tempStream);
else
System.setOut(tempStream);
try {
runnable.run();
return new String(outputStream.toByteArray()).trim();
} finally {
if (isErr)
System.setErr(currentStream);
else
System.setOut(currentStream);
}
}
}

12
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java

@ -177,7 +177,7 @@ public class TransactionsCommandTest {
List<List<String>> table = readOutputAsTable(); List<List<String>> table = readOutputAsTable();
assertEquals(3, table.size()); assertEquals(3, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.DescribeProducersCommand.HEADERS); List<String> expectedHeaders = TransactionsCommand.DescribeProducersCommand.HEADERS;
assertEquals(expectedHeaders, table.get(0)); assertEquals(expectedHeaders, table.get(0));
Set<List<String>> expectedRows = Utils.mkSet( Set<List<String>> expectedRows = Utils.mkSet(
@ -213,7 +213,7 @@ public class TransactionsCommandTest {
assertEquals(4, table.size()); assertEquals(4, table.size());
// Assert expected headers // Assert expected headers
List<String> expectedHeaders = asList(TransactionsCommand.ListTransactionsCommand.HEADERS); List<String> expectedHeaders = TransactionsCommand.ListTransactionsCommand.HEADERS;
assertEquals(expectedHeaders, table.get(0)); assertEquals(expectedHeaders, table.get(0));
Set<List<String>> expectedRows = Utils.mkSet( Set<List<String>> expectedRows = Utils.mkSet(
@ -272,7 +272,7 @@ public class TransactionsCommandTest {
List<List<String>> table = readOutputAsTable(); List<List<String>> table = readOutputAsTable();
assertEquals(2, table.size()); assertEquals(2, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.DescribeTransactionsCommand.HEADERS); List<String> expectedHeaders = TransactionsCommand.DescribeTransactionsCommand.HEADERS;
assertEquals(expectedHeaders, table.get(0)); assertEquals(expectedHeaders, table.get(0));
List<String> expectedRow = asList( List<String> expectedRow = asList(
@ -703,7 +703,7 @@ public class TransactionsCommandTest {
List<List<String>> table = readOutputAsTable(); List<List<String>> table = readOutputAsTable();
assertEquals(1, table.size()); assertEquals(1, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); List<String> expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS;
assertEquals(expectedHeaders, table.get(0)); assertEquals(expectedHeaders, table.get(0));
} }
@ -742,7 +742,7 @@ public class TransactionsCommandTest {
List<List<String>> table = readOutputAsTable(); List<List<String>> table = readOutputAsTable();
assertEquals(1, table.size()); assertEquals(1, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); List<String> expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS;
assertEquals(expectedHeaders, table.get(0)); assertEquals(expectedHeaders, table.get(0));
} }
@ -940,7 +940,7 @@ public class TransactionsCommandTest {
List<List<String>> table = readOutputAsTable(); List<List<String>> table = readOutputAsTable();
assertEquals(2, table.size()); assertEquals(2, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); List<String> expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS;
assertEquals(expectedHeaders, table.get(0)); assertEquals(expectedHeaders, table.get(0));
long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(time.milliseconds() - lastTimestamp); long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(time.milliseconds() - lastTimestamp);

Loading…
Cancel
Save