Browse Source

KAFKA-14575: Move ClusterTool to tools module (#13080)

Reviewers: dengziming <dengziming1993@gmail.com>, Federico Valeri  <fedevaleri@gmail.com>
pull/13147/head
Mickael Maison 2 years ago committed by GitHub
parent
commit
8b44237655
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-cluster.sh
  2. 125
      core/src/main/scala/kafka/tools/ClusterTool.scala
  3. 74
      core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
  4. 134
      tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
  5. 71
      tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java

2
bin/kafka-cluster.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@" exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ClusterTool "$@"

125
core/src/main/scala/kafka/tools/ClusterTool.scala

@ -1,125 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.PrintStream
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.store
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.utils.Utils
object ClusterTool extends Logging {
def main(args: Array[String]): Unit = {
try {
val parser = ArgumentParsers.
newArgumentParser("kafka-cluster").
defaultHelp(true).
description("The Kafka cluster tool.")
val subparsers = parser.addSubparsers().dest("command")
val clusterIdParser = subparsers.addParser("cluster-id").
help("Get information about the ID of a cluster.")
val unregisterParser = subparsers.addParser("unregister").
help("Unregister a broker.")
List(clusterIdParser, unregisterParser).foreach(parser => {
parser.addArgument("--bootstrap-server", "-b").
action(store()).
help("A list of host/port pairs to use for establishing the connection to the kafka cluster.")
parser.addArgument("--config", "-c").
action(store()).
help("A property file containing configs to passed to AdminClient.")
})
unregisterParser.addArgument("--id", "-i").
`type`(classOf[Integer]).
action(store()).
required(true).
help("The ID of the broker to unregister.")
val namespace = parser.parseArgsOrFail(args)
val command = namespace.getString("command")
val configPath = namespace.getString("config")
val properties = if (configPath == null) {
new Properties()
} else {
Utils.loadProps(configPath)
}
Option(namespace.getString("bootstrap_server")).
foreach(b => properties.setProperty("bootstrap.servers", b))
if (properties.getProperty("bootstrap.servers") == null) {
throw new TerseFailure("Please specify --bootstrap-server.")
}
command match {
case "cluster-id" =>
val adminClient = Admin.create(properties)
try {
clusterIdCommand(System.out, adminClient)
} finally {
adminClient.close()
}
Exit.exit(0)
case "unregister" =>
val adminClient = Admin.create(properties)
try {
unregisterCommand(System.out, adminClient, namespace.getInt("id"))
} finally {
adminClient.close()
}
Exit.exit(0)
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
} catch {
case e: TerseFailure =>
System.err.println(e.getMessage)
System.exit(1)
}
}
def clusterIdCommand(stream: PrintStream,
adminClient: Admin): Unit = {
val clusterId = Option(adminClient.describeCluster().clusterId().get())
clusterId match {
case None => stream.println(s"No cluster ID found. The Kafka version is probably too old.")
case Some(id) => stream.println(s"Cluster ID: ${id}")
}
}
def unregisterCommand(stream: PrintStream,
adminClient: Admin,
id: Int): Unit = {
try {
Option(adminClient.unregisterBroker(id).all().get())
stream.println(s"Broker ${id} is no longer registered.")
} catch {
case e: ExecutionException => {
val cause = e.getCause()
if (cause.isInstanceOf[UnsupportedVersionException]) {
stream.println(s"The target cluster does not support the broker unregistration API.")
} else {
throw e
}
}
}
}
}

74
core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala

@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import org.apache.kafka.clients.admin.MockAdminClient
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
@Timeout(value = 60)
class ClusterToolTest {
@Test
def testPrintClusterId(): Unit = {
val adminClient = new MockAdminClient.Builder().
clusterId("QtNwvtfVQ3GEFpzOmDEE-w").
build()
val stream = new ByteArrayOutputStream()
ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
assertEquals(
s"""Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w
""", stream.toString())
}
@Test
def testClusterTooOldToHaveId(): Unit = {
val adminClient = new MockAdminClient.Builder().
clusterId(null).
build()
val stream = new ByteArrayOutputStream()
ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
assertEquals(
s"""No cluster ID found. The Kafka version is probably too old.
""", stream.toString())
}
@Test
def testUnregisterBroker(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(true).
build()
val stream = new ByteArrayOutputStream()
ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0)
assertEquals(
s"""Broker 0 is no longer registered.
""", stream.toString())
}
@Test
def testLegacyModeClusterCannotUnregisterBroker(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(false).
build()
val stream = new ByteArrayOutputStream()
ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0)
assertEquals(
s"""The target cluster does not support the broker unregistration API.
""", stream.toString())
}
}

134
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static net.sourceforge.argparse4j.impl.Arguments.store;
public class ClusterTool {
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (TerseException e) {
System.err.println(e.getMessage());
return 1;
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
static void execute(String... args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("kafka-cluster")
.defaultHelp(true)
.description("The Kafka cluster tool.");
Subparsers subparsers = parser.addSubparsers().dest("command");
Subparser clusterIdParser = subparsers.addParser("cluster-id")
.help("Get information about the ID of a cluster.");
Subparser unregisterParser = subparsers.addParser("unregister")
.help("Unregister a broker.");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) {
subpparser.addArgument("--bootstrap-server", "-b")
.action(store())
.help("A list of host/port pairs to use for establishing the connection to the Kafka cluster.");
subpparser.addArgument("--config", "-c")
.action(store())
.help("A property file containing configurations for the Admin client.");
}
unregisterParser.addArgument("--id", "-i")
.type(Integer.class)
.action(store())
.required(true)
.help("The ID of the broker to unregister.");
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
String configPath = namespace.getString("config");
Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath);
String bootstrapServer = namespace.getString("bootstrap_server");
if (bootstrapServer != null) {
properties.setProperty("bootstrap.servers", bootstrapServer);
}
if (properties.getProperty("bootstrap.servers") == null) {
throw new TerseException("Please specify --bootstrap-server.");
}
switch (command) {
case "cluster-id": {
try (Admin adminClient = Admin.create(properties)) {
clusterIdCommand(System.out, adminClient);
}
break;
}
case "unregister": {
try (Admin adminClient = Admin.create(properties)) {
unregisterCommand(System.out, adminClient, namespace.getInt("id"));
}
break;
}
default:
throw new RuntimeException("Unknown command " + command);
}
}
static void clusterIdCommand(PrintStream stream, Admin adminClient) throws Exception {
String clusterId = adminClient.describeCluster().clusterId().get();
if (clusterId != null) {
stream.println("Cluster ID: " + clusterId);
} else {
stream.println("No cluster ID found. The Kafka version is probably too old.");
}
}
static void unregisterCommand(PrintStream stream, Admin adminClient, int id) throws Exception {
try {
adminClient.unregisterBroker(id).all().get();
stream.println("Broker " + id + " is no longer registered.");
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof UnsupportedVersionException) {
stream.println("The target cluster does not support the broker unregistration API.");
} else {
throw ee;
}
}
}
}

71
tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 60)
public class ClusterToolTest {
@Test
public void testPrintClusterId() throws Exception {
Admin adminClient = new MockAdminClient.Builder().
clusterId("QtNwvtfVQ3GEFpzOmDEE-w").
build();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient);
assertEquals("Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w\n", stream.toString());
}
@Test
public void testClusterTooOldToHaveId() throws Exception {
Admin adminClient = new MockAdminClient.Builder().
clusterId(null).
build();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient);
assertEquals("No cluster ID found. The Kafka version is probably too old.\n", stream.toString());
}
@Test
public void testUnregisterBroker() throws Exception {
Admin adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(true).
build();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0);
assertEquals("Broker 0 is no longer registered.\n", stream.toString());
}
@Test
public void testLegacyModeClusterCannotUnregisterBroker() throws Exception {
Admin adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(false).
build();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0);
assertEquals("The target cluster does not support the broker unregistration API.\n", stream.toString());
}
}
Loading…
Cancel
Save