diff --git a/bin/kafka-cluster.sh b/bin/kafka-cluster.sh index 574007e9cd4..f09858c423f 100755 --- a/bin/kafka-cluster.sh +++ b/bin/kafka-cluster.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ClusterTool "$@" diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala deleted file mode 100644 index ed82eeba0ed..00000000000 --- a/core/src/main/scala/kafka/tools/ClusterTool.scala +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.PrintStream -import java.util.Properties -import java.util.concurrent.ExecutionException - -import kafka.utils.{Exit, Logging} -import net.sourceforge.argparse4j.ArgumentParsers -import net.sourceforge.argparse4j.impl.Arguments.store -import org.apache.kafka.clients.admin.Admin -import org.apache.kafka.common.errors.UnsupportedVersionException -import org.apache.kafka.common.utils.Utils - -object ClusterTool extends Logging { - def main(args: Array[String]): Unit = { - try { - val parser = ArgumentParsers. - newArgumentParser("kafka-cluster"). - defaultHelp(true). - description("The Kafka cluster tool.") - val subparsers = parser.addSubparsers().dest("command") - - val clusterIdParser = subparsers.addParser("cluster-id"). - help("Get information about the ID of a cluster.") - val unregisterParser = subparsers.addParser("unregister"). - help("Unregister a broker.") - List(clusterIdParser, unregisterParser).foreach(parser => { - parser.addArgument("--bootstrap-server", "-b"). - action(store()). - help("A list of host/port pairs to use for establishing the connection to the kafka cluster.") - parser.addArgument("--config", "-c"). - action(store()). - help("A property file containing configs to passed to AdminClient.") - }) - unregisterParser.addArgument("--id", "-i"). - `type`(classOf[Integer]). - action(store()). - required(true). - help("The ID of the broker to unregister.") - - val namespace = parser.parseArgsOrFail(args) - val command = namespace.getString("command") - val configPath = namespace.getString("config") - val properties = if (configPath == null) { - new Properties() - } else { - Utils.loadProps(configPath) - } - Option(namespace.getString("bootstrap_server")). - foreach(b => properties.setProperty("bootstrap.servers", b)) - if (properties.getProperty("bootstrap.servers") == null) { - throw new TerseFailure("Please specify --bootstrap-server.") - } - - command match { - case "cluster-id" => - val adminClient = Admin.create(properties) - try { - clusterIdCommand(System.out, adminClient) - } finally { - adminClient.close() - } - Exit.exit(0) - case "unregister" => - val adminClient = Admin.create(properties) - try { - unregisterCommand(System.out, adminClient, namespace.getInt("id")) - } finally { - adminClient.close() - } - Exit.exit(0) - case _ => - throw new RuntimeException(s"Unknown command $command") - } - } catch { - case e: TerseFailure => - System.err.println(e.getMessage) - System.exit(1) - } - } - - def clusterIdCommand(stream: PrintStream, - adminClient: Admin): Unit = { - val clusterId = Option(adminClient.describeCluster().clusterId().get()) - clusterId match { - case None => stream.println(s"No cluster ID found. The Kafka version is probably too old.") - case Some(id) => stream.println(s"Cluster ID: ${id}") - } - } - - def unregisterCommand(stream: PrintStream, - adminClient: Admin, - id: Int): Unit = { - try { - Option(adminClient.unregisterBroker(id).all().get()) - stream.println(s"Broker ${id} is no longer registered.") - } catch { - case e: ExecutionException => { - val cause = e.getCause() - if (cause.isInstanceOf[UnsupportedVersionException]) { - stream.println(s"The target cluster does not support the broker unregistration API.") - } else { - throw e - } - } - } - } -} diff --git a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala deleted file mode 100644 index b98cd8e9ea6..00000000000 --- a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.{ByteArrayOutputStream, PrintStream} -import org.apache.kafka.clients.admin.MockAdminClient -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{Test, Timeout} - -@Timeout(value = 60) -class ClusterToolTest { - @Test - def testPrintClusterId(): Unit = { - val adminClient = new MockAdminClient.Builder(). - clusterId("QtNwvtfVQ3GEFpzOmDEE-w"). - build() - val stream = new ByteArrayOutputStream() - ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient) - assertEquals( - s"""Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w -""", stream.toString()) - } - - @Test - def testClusterTooOldToHaveId(): Unit = { - val adminClient = new MockAdminClient.Builder(). - clusterId(null). - build() - val stream = new ByteArrayOutputStream() - ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient) - assertEquals( - s"""No cluster ID found. The Kafka version is probably too old. -""", stream.toString()) - } - - @Test - def testUnregisterBroker(): Unit = { - val adminClient = new MockAdminClient.Builder().numBrokers(3). - usingRaftController(true). - build() - val stream = new ByteArrayOutputStream() - ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0) - assertEquals( - s"""Broker 0 is no longer registered. -""", stream.toString()) - } - - @Test - def testLegacyModeClusterCannotUnregisterBroker(): Unit = { - val adminClient = new MockAdminClient.Builder().numBrokers(3). - usingRaftController(false). - build() - val stream = new ByteArrayOutputStream() - ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0) - assertEquals( - s"""The target cluster does not support the broker unregistration API. -""", stream.toString()) - } -} diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java new file mode 100644 index 00000000000..ef5cb1c2642 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class ClusterTool { + + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("kafka-cluster") + .defaultHelp(true) + .description("The Kafka cluster tool."); + Subparsers subparsers = parser.addSubparsers().dest("command"); + + Subparser clusterIdParser = subparsers.addParser("cluster-id") + .help("Get information about the ID of a cluster."); + Subparser unregisterParser = subparsers.addParser("unregister") + .help("Unregister a broker."); + for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) { + subpparser.addArgument("--bootstrap-server", "-b") + .action(store()) + .help("A list of host/port pairs to use for establishing the connection to the Kafka cluster."); + subpparser.addArgument("--config", "-c") + .action(store()) + .help("A property file containing configurations for the Admin client."); + } + unregisterParser.addArgument("--id", "-i") + .type(Integer.class) + .action(store()) + .required(true) + .help("The ID of the broker to unregister."); + + Namespace namespace = parser.parseArgsOrFail(args); + String command = namespace.getString("command"); + String configPath = namespace.getString("config"); + Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath); + + String bootstrapServer = namespace.getString("bootstrap_server"); + if (bootstrapServer != null) { + properties.setProperty("bootstrap.servers", bootstrapServer); + } + if (properties.getProperty("bootstrap.servers") == null) { + throw new TerseException("Please specify --bootstrap-server."); + } + + switch (command) { + case "cluster-id": { + try (Admin adminClient = Admin.create(properties)) { + clusterIdCommand(System.out, adminClient); + } + break; + } + case "unregister": { + try (Admin adminClient = Admin.create(properties)) { + unregisterCommand(System.out, adminClient, namespace.getInt("id")); + } + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static void clusterIdCommand(PrintStream stream, Admin adminClient) throws Exception { + String clusterId = adminClient.describeCluster().clusterId().get(); + if (clusterId != null) { + stream.println("Cluster ID: " + clusterId); + } else { + stream.println("No cluster ID found. The Kafka version is probably too old."); + } + } + + static void unregisterCommand(PrintStream stream, Admin adminClient, int id) throws Exception { + try { + adminClient.unregisterBroker(id).all().get(); + stream.println("Broker " + id + " is no longer registered."); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof UnsupportedVersionException) { + stream.println("The target cluster does not support the broker unregistration API."); + } else { + throw ee; + } + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java new file mode 100644 index 00000000000..7e69bd78413 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Timeout(value = 60) +public class ClusterToolTest { + + @Test + public void testPrintClusterId() throws Exception { + Admin adminClient = new MockAdminClient.Builder(). + clusterId("QtNwvtfVQ3GEFpzOmDEE-w"). + build(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient); + assertEquals("Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w\n", stream.toString()); + } + + @Test + public void testClusterTooOldToHaveId() throws Exception { + Admin adminClient = new MockAdminClient.Builder(). + clusterId(null). + build(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient); + assertEquals("No cluster ID found. The Kafka version is probably too old.\n", stream.toString()); + } + + @Test + public void testUnregisterBroker() throws Exception { + Admin adminClient = new MockAdminClient.Builder().numBrokers(3). + usingRaftController(true). + build(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0); + assertEquals("Broker 0 is no longer registered.\n", stream.toString()); + } + + @Test + public void testLegacyModeClusterCannotUnregisterBroker() throws Exception { + Admin adminClient = new MockAdminClient.Builder().numBrokers(3). + usingRaftController(false). + build(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0); + assertEquals("The target cluster does not support the broker unregistration API.\n", stream.toString()); + } +}