diff --git a/bin/kafka-leader-election.sh b/bin/kafka-leader-election.sh index 88baef398de..49b3cf5c61a 100755 --- a/bin/kafka-leader-election.sh +++ b/bin/kafka-leader-election.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.admin.LeaderElectionCommand "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LeaderElectionCommand "$@" diff --git a/bin/windows/kafka-leader-election.bat b/bin/windows/kafka-leader-election.bat index 0432a99b6e4..92e03dfc63f 100644 --- a/bin/windows/kafka-leader-election.bat +++ b/bin/windows/kafka-leader-election.bat @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. rem See the License for the specific language governing permissions and rem limitations under the License. -"%~dp0kafka-run-class.bat" kafka.admin.LeaderElectionCommand %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LeaderElectionCommand %* diff --git a/build.gradle b/build.gradle index ffdbbf1ace4..82530e05c70 100644 --- a/build.gradle +++ b/build.gradle @@ -873,7 +873,7 @@ project(':core') { implementation project(':server-common') implementation project(':group-coordinator') implementation project(':metadata') - implementation project(':storage:api') + implementation project(':storage:storage-api') implementation project(':tools:tools-api') implementation project(':raft') implementation project(':storage') @@ -915,7 +915,7 @@ project(':core') { testImplementation project(':metadata').sourceSets.test.output testImplementation project(':raft').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output - testImplementation project(':storage:api').sourceSets.test.output + testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation libs.bcpkix testImplementation libs.mockitoCore testImplementation(libs.apacheda) { @@ -1638,7 +1638,7 @@ project(':server-common') { } } -project(':storage:api') { +project(':storage:storage-api') { archivesBaseName = "kafka-storage-api" dependencies { @@ -1714,7 +1714,7 @@ project(':storage') { } dependencies { - implementation project(':storage:api') + implementation project(':storage:storage-api') implementation project(':server-common') implementation project(':clients') implementation libs.caffeine diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala deleted file mode 100644 index 868c54916e9..00000000000 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.util.Properties -import java.util.concurrent.ExecutionException -import joptsimple.util.EnumConverter -import kafka.utils.CoreUtils -import kafka.utils.Implicits._ -import kafka.utils.Json -import kafka.utils.Logging -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} -import org.apache.kafka.common.ElectionType -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.ClusterAuthorizationException -import org.apache.kafka.common.errors.ElectionNotNeededException -import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException} -import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} - -import scala.jdk.CollectionConverters._ -import scala.collection.mutable -import scala.concurrent.duration._ - -object LeaderElectionCommand extends Logging { - def main(args: Array[String]): Unit = { - run(args, 30.second) - } - - def run(args: Array[String], timeout: Duration): Unit = { - val commandOptions = new LeaderElectionCommandOptions(args) - CommandLineUtils.maybePrintHelpOrVersion( - commandOptions, - "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas." - ) - - validate(commandOptions) - - val electionType = commandOptions.options.valueOf(commandOptions.electionType) - - val jsonFileTopicPartitions = Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { path => - parseReplicaElectionData(Utils.readFileAsString(path)) - } - - val singleTopicPartition = ( - Option(commandOptions.options.valueOf(commandOptions.topic)), - Option(commandOptions.options.valueOf(commandOptions.partition)) - ) match { - case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, partition))) - case _ => None - } - - /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use. - * The validate function should be checking that this option is required if the --topic and --path-to-json-file - * are not specified. - */ - val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition) - - val adminClient = { - val props = Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { config => - Utils.loadProps(config) - }.getOrElse(new Properties()) - - props.setProperty( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, - commandOptions.options.valueOf(commandOptions.bootstrapServer) - ) - props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) - props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString) - - Admin.create(props) - } - - try { - electLeaders(adminClient, electionType, topicPartitions) - } finally { - adminClient.close() - } - } - - private[this] def parseReplicaElectionData(jsonString: String): Set[TopicPartition] = { - Json.parseFull(jsonString) match { - case Some(js) => - js.asJsonObject.get("partitions") match { - case Some(partitionsList) => - val partitionsRaw = partitionsList.asJsonArray.iterator.map(_.asJsonObject) - val partitions = partitionsRaw.map { p => - val topic = p("topic").to[String] - val partition = p("partition").to[Int] - new TopicPartition(topic, partition) - }.toBuffer - val duplicatePartitions = CoreUtils.duplicates(partitions) - if (duplicatePartitions.nonEmpty) { - throw new AdminOperationException( - s"Replica election data contains duplicate partitions: ${duplicatePartitions.mkString(",")}" - ) - } - partitions.toSet - case None => throw new AdminOperationException("Replica election data is missing \"partitions\" field") - } - case None => throw new AdminOperationException("Replica election data is empty") - } - } - - private[this] def electLeaders( - client: Admin, - electionType: ElectionType, - topicPartitions: Option[Set[TopicPartition]] - ): Unit = { - val electionResults = try { - val partitions = topicPartitions.map(_.asJava).orNull - debug(s"Calling AdminClient.electLeaders($electionType, $partitions)") - client.electLeaders(electionType, partitions).partitions.get.asScala - } catch { - case e: ExecutionException => - e.getCause match { - case cause: TimeoutException => - val message = "Timeout waiting for election results" - println(message) - throw new AdminCommandFailedException(message, cause) - case cause: ClusterAuthorizationException => - val message = "Not authorized to perform leader election" - println(message) - throw new AdminCommandFailedException(message, cause) - case _ => - throw e - } - case e: Throwable => - println("Error while making request") - throw e - } - - val succeeded = mutable.Set.empty[TopicPartition] - val noop = mutable.Set.empty[TopicPartition] - val failed = mutable.Map.empty[TopicPartition, Throwable] - - electionResults.foreach[Unit] { case (topicPartition, error) => - if (error.isPresent) { - error.get match { - case _: ElectionNotNeededException => noop += topicPartition - case _ => failed += topicPartition -> error.get - } - } else { - succeeded += topicPartition - } - } - - if (succeeded.nonEmpty) { - val partitions = succeeded.mkString(", ") - println(s"Successfully completed leader election ($electionType) for partitions $partitions") - } - - if (noop.nonEmpty) { - val partitions = noop.mkString(", ") - println(s"Valid replica already elected for partitions $partitions") - } - - if (failed.nonEmpty) { - val rootException = new AdminCommandFailedException(s"${failed.size} replica(s) could not be elected") - failed.forKeyValue { (topicPartition, exception) => - println(s"Error completing leader election ($electionType) for partition: $topicPartition: $exception") - rootException.addSuppressed(exception) - } - throw rootException - } - } - - private[this] def validate(commandOptions: LeaderElectionCommandOptions): Unit = { - // required options: --bootstrap-server and --election-type - var missingOptions = List.empty[String] - if (!commandOptions.options.has(commandOptions.bootstrapServer)) { - missingOptions = commandOptions.bootstrapServer.options().get(0) :: missingOptions - } - - if (!commandOptions.options.has(commandOptions.electionType)) { - missingOptions = commandOptions.electionType.options().get(0) :: missingOptions - } - - if (missingOptions.nonEmpty) { - throw new AdminCommandFailedException(s"Missing required option(s): ${missingOptions.mkString(", ")}") - } - - // One and only one is required: --topic, --all-topic-partitions or --path-to-json-file - val mutuallyExclusiveOptions = Seq( - commandOptions.topic, - commandOptions.allTopicPartitions, - commandOptions.pathToJsonFile - ) - - mutuallyExclusiveOptions.count(commandOptions.options.has) match { - case 1 => // This is the only correct configuration, don't throw an exception - case _ => - throw new AdminCommandFailedException( - "One and only one of the following options is required: " + - s"${mutuallyExclusiveOptions.map(_.options.get(0)).mkString(", ")}" - ) - } - - // --partition if and only if --topic is used - ( - commandOptions.options.has(commandOptions.topic), - commandOptions.options.has(commandOptions.partition) - ) match { - case (true, false) => - throw new AdminCommandFailedException( - s"Missing required option(s): ${commandOptions.partition.options.get(0)}" - ) - case (false, true) => - throw new AdminCommandFailedException( - s"Option ${commandOptions.partition.options.get(0)} is only allowed if " + - s"${commandOptions.topic.options.get(0)} is used" - ) - case _ => // Ignore; we have a valid configuration - } - } -} - -private final class LeaderElectionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val bootstrapServer = parser - .accepts( - "bootstrap-server", - "A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.") - .withRequiredArg - .describedAs("host:port") - .ofType(classOf[String]) - val adminClientConfig = parser - .accepts( - "admin.config", - "Configuration properties files to pass to the admin client") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - - val pathToJsonFile = parser - .accepts( - "path-to-json-file", - "The JSON file with the list of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.") - .withRequiredArg - .describedAs("Path to JSON file") - .ofType(classOf[String]) - - val topic = parser - .accepts( - "topic", - "Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.") - .withRequiredArg - .describedAs("topic name") - .ofType(classOf[String]) - - val partition = parser - .accepts( - "partition", - "Partition id for which to perform an election. REQUIRED if --topic is specified.") - .withRequiredArg - .describedAs("partition id") - .ofType(classOf[Integer]) - - val allTopicPartitions = parser - .accepts( - "all-topic-partitions", - "Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified.") - - val electionType = parser - .accepts( - "election-type", - "Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.") - .withRequiredArg - .describedAs("election type") - .withValuesConvertedBy(ElectionTypeConverter) - - options = parser.parse(args: _*) -} - -final object ElectionTypeConverter extends EnumConverter[ElectionType](classOf[ElectionType]) { } diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala deleted file mode 100644 index 6d36120b136..00000000000 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.server.common.AdminCommandFailedException -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test - -import scala.concurrent.duration._ - -/** - * For some error cases, we can save a little build time by avoiding the overhead for - * cluster creation and cleanup because the command is expected to fail immediately. - */ -class LeaderElectionCommandErrorTest { - - @Test - def testTopicWithoutPartition(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", "nohost:9092", - "--election-type", "unclean", - "--topic", "some-topic" - ) - )) - assertTrue(e.getMessage.startsWith("Missing required option(s)")) - assertTrue(e.getMessage.contains(" partition")) - } - - @Test - def testPartitionWithoutTopic(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", "nohost:9092", - "--election-type", "unclean", - "--all-topic-partitions", - "--partition", "0" - ) - )) - assertEquals("Option partition is only allowed if topic is used", e.getMessage) - } - - @Test - def testMissingElectionType(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", "nohost:9092", - "--topic", "some-topic", - "--partition", "0" - ) - )) - assertTrue(e.getMessage.startsWith("Missing required option(s)")) - assertTrue(e.getMessage.contains(" election-type")) - } - - @Test - def testMissingTopicPartitionSelection(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", "nohost:9092", - "--election-type", "preferred" - ) - )) - assertTrue(e.getMessage.startsWith("One and only one of the following options is required: ")) - assertTrue(e.getMessage.contains(" all-topic-partitions")) - assertTrue(e.getMessage.contains(" topic")) - assertTrue(e.getMessage.contains(" path-to-json-file")) - } - - @Test - def testInvalidBroker(): Unit = { - val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run( - Array( - "--bootstrap-server", "example.com:1234", - "--election-type", "unclean", - "--all-topic-partitions" - ), - 1.seconds - )) - assertTrue(e.getCause.isInstanceOf[TimeoutException]) - } -} diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala deleted file mode 100644 index ff6cd2cad60..00000000000 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Path} -import kafka.server.IntegrationTestUtils.createTopic -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} -import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} -import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException -import org.apache.kafka.server.common.AdminCommandFailedException -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.extension.ExtendWith -import org.junit.jupiter.api.{BeforeEach, Tag} - -@ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) -@Tag("integration") -final class LeaderElectionCommandTest(cluster: ClusterInstance) { - import LeaderElectionCommandTest._ - - val broker1 = 0 - val broker2 = 1 - val broker3 = 2 - - @BeforeEach - def setup(clusterConfig: ClusterConfig): Unit = { - TestUtils.verifyNoUnexpectedThreads("@BeforeEach") - clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") - clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true") - clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1") - clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000") - clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") - } - - @ClusterTest - def testAllTopicPartition(): Unit = { - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) - - cluster.waitForReadyBrokers() - val client = cluster.createAdminClient() - createTopic(client, topic, Map(partition -> assignment)) - - val topicPartition = new TopicPartition(topic, partition) - - TestUtils.assertLeader(client, topicPartition, broker2) - cluster.shutdownBroker(broker3) - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - cluster.shutdownBroker(broker2) - TestUtils.assertNoLeader(client, topicPartition) - cluster.startBroker(broker3) - TestUtils.waitForOnlineBroker(client, broker3) - - LeaderElectionCommand.main( - Array( - "--bootstrap-server", cluster.bootstrapServers(), - "--election-type", "unclean", - "--all-topic-partitions" - ) - ) - - TestUtils.assertLeader(client, topicPartition, broker3) - } - - @ClusterTest - def testTopicPartition(): Unit = { - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) - - cluster.waitForReadyBrokers() - val client = cluster.createAdminClient() - createTopic(client, topic, Map(partition -> assignment)) - - val topicPartition = new TopicPartition(topic, partition) - - TestUtils.assertLeader(client, topicPartition, broker2) - - cluster.shutdownBroker(broker3) - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - cluster.shutdownBroker(broker2) - TestUtils.assertNoLeader(client, topicPartition) - cluster.startBroker(broker3) - TestUtils.waitForOnlineBroker(client, broker3) - - LeaderElectionCommand.main( - Array( - "--bootstrap-server", cluster.bootstrapServers(), - "--election-type", "unclean", - "--topic", topic, - "--partition", partition.toString - ) - ) - - TestUtils.assertLeader(client, topicPartition, broker3) - } - - @ClusterTest - def testPathToJsonFile(): Unit = { - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) - - cluster.waitForReadyBrokers() - val client = cluster.createAdminClient() - createTopic(client, topic, Map(partition -> assignment)) - - val topicPartition = new TopicPartition(topic, partition) - - TestUtils.assertLeader(client, topicPartition, broker2) - - cluster.shutdownBroker(broker3) - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - cluster.shutdownBroker(broker2) - TestUtils.assertNoLeader(client, topicPartition) - cluster.startBroker(broker3) - TestUtils.waitForOnlineBroker(client, broker3) - - val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition)) - - LeaderElectionCommand.main( - Array( - "--bootstrap-server", cluster.bootstrapServers(), - "--election-type", "unclean", - "--path-to-json-file", topicPartitionPath.toString - ) - ) - - TestUtils.assertLeader(client, topicPartition, broker3) - } - - @ClusterTest - def testPreferredReplicaElection(): Unit = { - val topic = "preferred-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) - - cluster.waitForReadyBrokers() - val client = cluster.createAdminClient() - createTopic(client, topic, Map(partition -> assignment)) - - val topicPartition = new TopicPartition(topic, partition) - - TestUtils.assertLeader(client, topicPartition, broker2) - - cluster.shutdownBroker(broker2) - TestUtils.assertLeader(client, topicPartition, broker3) - cluster.startBroker(broker2) - TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2)) - - LeaderElectionCommand.main( - Array( - "--bootstrap-server", cluster.bootstrapServers(), - "--election-type", "preferred", - "--all-topic-partitions" - ) - ) - - TestUtils.assertLeader(client, topicPartition, broker2) - } - - @ClusterTest - def testTopicDoesNotExist(): Unit = { - val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", cluster.bootstrapServers(), - "--election-type", "preferred", - "--topic", "unknown-topic-name", - "--partition", "0" - ) - )) - assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException]) - } - - @ClusterTest - def testElectionResultOutput(): Unit = { - val topic = "non-preferred-topic" - val partition0 = 0 - val partition1 = 1 - val assignment0 = Seq(broker2, broker3) - val assignment1 = Seq(broker3, broker2) - - cluster.waitForReadyBrokers() - val client = cluster.createAdminClient() - createTopic(client, topic, Map( - partition0 -> assignment0, - partition1 -> assignment1 - )) - - val topicPartition0 = new TopicPartition(topic, partition0) - val topicPartition1 = new TopicPartition(topic, partition1) - - TestUtils.assertLeader(client, topicPartition0, broker2) - TestUtils.assertLeader(client, topicPartition1, broker3) - - cluster.shutdownBroker(broker2) - TestUtils.assertLeader(client, topicPartition0, broker3) - cluster.startBroker(broker2) - TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2)) - TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2)) - - val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1)) - val output = TestUtils.grabConsoleOutput( - LeaderElectionCommand.main( - Array( - "--bootstrap-server", cluster.bootstrapServers(), - "--election-type", "preferred", - "--path-to-json-file", topicPartitionPath.toString - ) - ) - ) - - val electionResultOutputIter = output.split("\n").iterator - - assertTrue(electionResultOutputIter.hasNext) - val firstLine = electionResultOutputIter.next() - assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"), - s"Unexpected output: $firstLine") - - assertTrue(electionResultOutputIter.hasNext) - val secondLine = electionResultOutputIter.next() - assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"), - s"Unexpected output: $secondLine") - } -} - -object LeaderElectionCommandTest { - def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = { - Map( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers), - AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000", - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000" - ) - } - - def bootstrapServers(servers: Seq[KafkaServer]): String = { - TestUtils.plaintextBootstrapServers(servers) - } - - def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = { - val file = TestUtils.tempFile("leader-election-command", ".json") - - val jsonString = TestUtils.stringifyTopicPartitions(partitions) - - Files.write(file.toPath, jsonString.getBytes(StandardCharsets.UTF_8)) - - file.toPath - } -} diff --git a/settings.gradle b/settings.gradle index 79af7b84c30..cee7ff237a7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -96,4 +96,5 @@ include 'clients', 'tools:tools-api', 'trogdor' +project(":storage:api").name = "storage-api" rootProject.name = 'kafka' diff --git a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java new file mode 100644 index 00000000000..8d34937c900 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.AbstractOptionSpec; +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpecBuilder; +import joptsimple.util.EnumConverter; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.ElectionNotNeededException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class LeaderElectionCommand { + private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class); + private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); + + public static void main(String... args) { + try { + run(Duration.ofMillis(30000), args); + } catch (Exception e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + } + } + + static void run(Duration timeoutMs, String... args) throws Exception { + LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args); + + commandOptions.maybePrintHelpOrVersion(); + + commandOptions.validate(); + ElectionType electionType = commandOptions.getElectionType(); + Optional> jsonFileTopicPartitions = + Optional.ofNullable(commandOptions.getPathToJsonFile()) + .map(path -> parseReplicaElectionData(path)); + + Optional topicOption = Optional.ofNullable(commandOptions.getTopic()); + Optional partitionOption = Optional.ofNullable(commandOptions.getPartition()); + final Optional> singleTopicPartition = + (topicOption.isPresent() && partitionOption.isPresent()) ? + Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) : + Optional.empty(); + + /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use. + * The validate function should be checking that this option is required if the --topic and --path-to-json-file + * are not specified. + */ + Optional> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition); + + Properties props = new Properties(); + if (commandOptions.hasAdminClientConfig()) { + props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig())); + } + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer()); + props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis())); + props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2)); + + try (Admin adminClient = Admin.create(props)) { + electLeaders(adminClient, electionType, topicPartitions); + } + } + + private static void electLeaders(Admin client, ElectionType electionType, Optional> partitions) { + LOG.debug("Calling AdminClient.electLeaders({}, {})", electionType, partitions.orElse(null)); + Map> electionResults; + try { + electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof TimeoutException) { + String message = "Timeout waiting for election results"; + System.out.println(message); + throw new AdminCommandFailedException(message, e.getCause()); + } else if (e.getCause() instanceof ClusterAuthorizationException) { + String message = "Not authorized to perform leader election"; + System.out.println(message); + throw new AdminCommandFailedException(message, e.getCause().getCause()); + } else { + throw new RuntimeException(e); + } + } catch (InterruptedException e) { + System.out.println("Error while making request"); + throw new RuntimeException(e); + } + + Set succeeded = new HashSet<>(); + Set noop = new HashSet<>(); + Map failed = new HashMap<>(); + + electionResults.entrySet().stream().forEach(entry -> { + Optional error = entry.getValue(); + if (error.isPresent()) { + if (error.get() instanceof ElectionNotNeededException) { + noop.add(entry.getKey()); + } else { + failed.put(entry.getKey(), error.get()); + } + } else { + succeeded.add(entry.getKey()); + } + }); + + if (!succeeded.isEmpty()) { + String partitionsAsString = succeeded.stream() + .map(TopicPartition::toString) + .collect(Collectors.joining(", ")); + System.out.println(String.format("Successfully completed leader election (%s) for partitions %s", + electionType, partitionsAsString)); + } + + if (!noop.isEmpty()) { + String partitionsAsString = noop.stream() + .map(TopicPartition::toString) + .collect(Collectors.joining(", ")); + System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString)); + } + + if (!failed.isEmpty()) { + AdminCommandFailedException rootException = + new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size())); + failed.entrySet().forEach(entry -> { + System.out.println(String.format("Error completing leader election (%s) for partition: %s: %s", + electionType, entry.getKey(), entry.getValue())); + rootException.addSuppressed(entry.getValue()); + }); + throw rootException; + } + } + + private static Set parseReplicaElectionData(String path) { + Optional jsonFile; + try { + jsonFile = Json.parseFull(Utils.readFileAsString(path)); + return jsonFile.map(js -> { + try { + return topicPartitions(js); + } catch (JsonMappingException e) { + throw new RuntimeException(e); + } + }).orElseThrow(() -> new AdminOperationException("Replica election data is empty")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static Set topicPartitions(JsonValue js) throws JsonMappingException { + return js.asJsonObject().get("partitions") + .map(partitionsList -> { + try { + return toTopicPartition(partitionsList); + } catch (JsonMappingException e) { + throw new RuntimeException(e); + } + }) + .orElseThrow(() -> new AdminOperationException("Replica election data is missing \"partitions\" field")); + } + + private static Set toTopicPartition(JsonValue partitionsList) throws JsonMappingException { + List partitions = new ArrayList<>(); + Iterator iterator = partitionsList.asJsonArray().iterator(); + + while (iterator.hasNext()) { + JsonObject partitionJs = iterator.next().asJsonObject(); + String topic = partitionJs.apply("topic").to(STRING); + int partition = partitionJs.apply("partition").to(INT); + partitions.add(new TopicPartition(topic, partition)); + } + + Set duplicatePartitions = partitions.stream() + .filter(i -> Collections.frequency(partitions, i) > 1) + .collect(Collectors.toSet()); + + if (duplicatePartitions.size() > 0) { + throw new AdminOperationException(String.format( + "Replica election data contains duplicate partitions: %s", String.join(",", duplicatePartitions.toString())) + ); + } + return new HashSet<>(partitions); + } + + static class LeaderElectionCommandOptions extends CommandDefaultOptions { + private final ArgumentAcceptingOptionSpec bootstrapServer; + private final ArgumentAcceptingOptionSpec adminClientConfig; + private final ArgumentAcceptingOptionSpec pathToJsonFile; + private final ArgumentAcceptingOptionSpec topic; + private final ArgumentAcceptingOptionSpec partition; + private final OptionSpecBuilder allTopicPartitions; + private final ArgumentAcceptingOptionSpec electionType; + public LeaderElectionCommandOptions(String[] args) { + super(args); + bootstrapServer = parser + .accepts( + "bootstrap-server", + "A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.") + .withRequiredArg() + .describedAs("host:port") + .ofType(String.class); + adminClientConfig = parser + .accepts( + "admin.config", + "Configuration properties files to pass to the admin client") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + pathToJsonFile = parser + .accepts( + "path-to-json-file", + "The JSON file with the list of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.") + .withRequiredArg() + .describedAs("Path to JSON file") + .ofType(String.class); + topic = parser + .accepts( + "topic", + "Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.") + .withRequiredArg() + .describedAs("topic name") + .ofType(String.class); + + partition = parser + .accepts( + "partition", + "Partition id for which to perform an election. REQUIRED if --topic is specified.") + .withRequiredArg() + .describedAs("partition id") + .ofType(Integer.class); + + allTopicPartitions = parser + .accepts( + "all-topic-partitions", + "Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified."); + electionType = parser + .accepts( + "election-type", + "Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.") + .withRequiredArg() + .describedAs("election type") + .withValuesConvertedBy(new ElectionTypeConverter()); + + options = parser.parse(args); + } + + public boolean hasAdminClientConfig() { + return options.has(adminClientConfig); + } + + public ElectionType getElectionType() { + return options.valueOf(electionType); + } + + public String getPathToJsonFile() { + return options.valueOf(pathToJsonFile); + } + + public String getBootstrapServer() { + return options.valueOf(bootstrapServer); + } + + public String getAdminClientConfig() { + return options.valueOf(adminClientConfig); + } + + public String getTopic() { + return options.valueOf(topic); + } + + public Integer getPartition() { + return options.valueOf(partition); + } + + public void validate() { + // required options: --bootstrap-server and --election-type + List missingOptions = new ArrayList<>(); + + if (!options.has(bootstrapServer)) { + missingOptions.add(bootstrapServer.options().get(0)); + } + if (!options.has(electionType)) { + missingOptions.add(electionType.options().get(0)); + } + if (!missingOptions.isEmpty()) { + throw new AdminCommandFailedException("Missing required option(s): " + String.join(", ", missingOptions)); + } + + // One and only one is required: --topic, --all-topic-partitions or --path-to-json-file + List> mutuallyExclusiveOptions = Arrays.asList( + topic, + allTopicPartitions, + pathToJsonFile + ); + + long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream() + .filter(abstractOptionSpec -> options.has(abstractOptionSpec)) + .count(); + // 1 is the only correct configuration, don't throw an exception + if (mutuallyExclusiveOptionsCount != 1) { + throw new AdminCommandFailedException( + "One and only one of the following options is required: " + + mutuallyExclusiveOptions.stream().map(opt -> opt.options().get(0)).collect(Collectors.joining(", ")) + ); + } + // --partition if and only if --topic is used + if (options.has(topic) && !options.has(partition)) { + throw new AdminCommandFailedException(String.format("Missing required option(s): %s", + partition.options().get(0))); + } + + if (!options.has(topic) && options.has(partition)) { + throw new AdminCommandFailedException(String.format("Option %s is only allowed if %s is used", + partition.options().get(0), + topic.options().get(0) + )); + } + } + + public void maybePrintHelpOrVersion() { + CommandLineUtils.maybePrintHelpOrVersion( + this, + "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas." + ); + } + + } + + static class ElectionTypeConverter extends EnumConverter { + public ElectionTypeConverter() { + super(ElectionType.class); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java new file mode 100644 index 00000000000..fef75bfd104 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * For some error cases, we can save a little build time by avoiding the overhead for + * cluster creation and cleanup because the command is expected to fail immediately. + */ +public class LeaderElectionCommandErrorTest { + @Test + public void testTopicWithoutPartition() { + String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main( + "--bootstrap-server", "nohost:9092", + "--election-type", "unclean", + "--topic", "some-topic" + )); + assertTrue(out.startsWith("Missing required option(s)")); + assertTrue(out.contains(" partition")); + } + + @Test + public void testPartitionWithoutTopic() { + String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main( + "--bootstrap-server", "nohost:9092", + "--election-type", "unclean", + "--all-topic-partitions", + "--partition", "0" + )); + String[] rows = out.split("\n"); + assertTrue(out.startsWith("Option partition is only allowed if topic is used")); + } + + @Test + public void testMissingElectionType() { + String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main( + "--bootstrap-server", "nohost:9092", + "--topic", "some-topic", + "--partition", "0" + )); + assertTrue(out.startsWith("Missing required option(s)")); + assertTrue(out.contains(" election-type")); + } + + @Test + public void testMissingTopicPartitionSelection() { + String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main( + "--bootstrap-server", "nohost:9092", + "--election-type", "preferred" + )); + assertTrue(out.startsWith("One and only one of the following options is required: ")); + assertTrue(out.contains(" all-topic-partitions")); + assertTrue(out.contains(" topic")); + assertTrue(out.contains(" path-to-json-file")); + } + + @Test + public void testInvalidBroker() { + Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run( + Duration.ofSeconds(1), + "--bootstrap-server", "example.com:1234", + "--election-type", "unclean", + "--all-topic-partitions" + )); + assertTrue(e.getCause() instanceof TimeoutException); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java new file mode 100644 index 00000000000..0c9fa753f75 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.collection.JavaConverters; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@SuppressWarnings("deprecation") +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) +@Tag("integration") +public class LeaderElectionCommandTest { + private final ClusterInstance cluster; + int broker2 = 1; + int broker3 = 2; + + public LeaderElectionCommandTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @BeforeEach + void setup(ClusterConfig clusterConfig) { + TestUtils.verifyNoUnexpectedThreads("@BeforeEach"); + clusterConfig.serverProperties().put("auto.leader.rebalance.enable", "false"); + clusterConfig.serverProperties().put("controlled.shutdown.enable", "true"); + clusterConfig.serverProperties().put("controlled.shutdown.max.retries", "1"); + clusterConfig.serverProperties().put("controlled.shutdown.retry.backoff.ms", "1000"); + clusterConfig.serverProperties().put("offsets.topic.replication.factor", "2"); + } + + @ClusterTest + public void testAllTopicPartition() throws InterruptedException, ExecutionException { + String topic = "unclean-topic"; + int partition = 0; + List assignment = Arrays.asList(broker2, broker3); + + cluster.waitForReadyBrokers(); + Admin client = cluster.createAdminClient(); + + createTopic(client, topic, Collections.singletonMap(partition, assignment)); + + TopicPartition topicPartition = new TopicPartition(topic, partition); + + TestUtils.assertLeader(client, topicPartition, broker2); + cluster.shutdownBroker(broker3); + TestUtils.waitForBrokersOutOfIsr(client, + JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(), + JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet() + ); + cluster.shutdownBroker(broker2); + TestUtils.assertNoLeader(client, topicPartition); + cluster.startBroker(broker3); + TestUtils.waitForOnlineBroker(client, broker3); + + LeaderElectionCommand.main( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--all-topic-partitions" + ); + + TestUtils.assertLeader(client, topicPartition, broker3); + } + + @ClusterTest + public void testTopicPartition() throws InterruptedException, ExecutionException { + String topic = "unclean-topic"; + int partition = 0; + List assignment = Arrays.asList(broker2, broker3); + + cluster.waitForReadyBrokers(); + Admin client = cluster.createAdminClient(); + createTopic(client, topic, Collections.singletonMap(partition, assignment)); + + TopicPartition topicPartition = new TopicPartition(topic, partition); + + TestUtils.assertLeader(client, topicPartition, broker2); + + cluster.shutdownBroker(broker3); + TestUtils.waitForBrokersOutOfIsr(client, + JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(), + JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet() + ); + cluster.shutdownBroker(broker2); + TestUtils.assertNoLeader(client, topicPartition); + cluster.startBroker(broker3); + TestUtils.waitForOnlineBroker(client, broker3); + + LeaderElectionCommand.main( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--topic", topic, + "--partition", Integer.toString(partition) + ); + + TestUtils.assertLeader(client, topicPartition, broker3); + } + + @ClusterTest + public void testPathToJsonFile() throws Exception { + String topic = "unclean-topic"; + int partition = 0; + List assignment = Arrays.asList(broker2, broker3); + + cluster.waitForReadyBrokers(); + Map> partitionAssignment = new HashMap<>(); + partitionAssignment.put(partition, assignment); + + Admin client = cluster.createAdminClient(); + createTopic(client, topic, partitionAssignment); + + TopicPartition topicPartition = new TopicPartition(topic, partition); + + TestUtils.assertLeader(client, topicPartition, broker2); + + cluster.shutdownBroker(broker3); + TestUtils.waitForBrokersOutOfIsr(client, + JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(), + JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet() + ); + cluster.shutdownBroker(broker2); + TestUtils.assertNoLeader(client, topicPartition); + cluster.startBroker(broker3); + TestUtils.waitForOnlineBroker(client, broker3); + + Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition)); + + LeaderElectionCommand.main( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--path-to-json-file", topicPartitionPath.toString() + ); + + TestUtils.assertLeader(client, topicPartition, broker3); + } + + @ClusterTest + public void testPreferredReplicaElection() throws InterruptedException, ExecutionException { + String topic = "preferred-topic"; + int partition = 0; + List assignment = Arrays.asList(broker2, broker3); + + cluster.waitForReadyBrokers(); + Admin client = cluster.createAdminClient(); + Map> partitionAssignment = new HashMap<>(); + partitionAssignment.put(partition, assignment); + + createTopic(client, topic, partitionAssignment); + + TopicPartition topicPartition = new TopicPartition(topic, partition); + + TestUtils.assertLeader(client, topicPartition, broker2); + + cluster.shutdownBroker(broker2); + TestUtils.assertLeader(client, topicPartition, broker3); + cluster.startBroker(broker2); + TestUtils.waitForBrokersInIsr(client, topicPartition, + JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet() + ); + + LeaderElectionCommand.main( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--all-topic-partitions" + ); + + TestUtils.assertLeader(client, topicPartition, broker2); + } + + @ClusterTest + public void testTopicDoesNotExist() { + Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run( + Duration.ofSeconds(30), + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--topic", "unknown-topic-name", + "--partition", "0" + )); + assertTrue(e.getSuppressed()[0] instanceof UnknownTopicOrPartitionException); + } + + @ClusterTest + public void testElectionResultOutput() throws Exception { + String topic = "non-preferred-topic"; + int partition0 = 0; + int partition1 = 1; + List assignment0 = Arrays.asList(broker2, broker3); + List assignment1 = Arrays.asList(broker3, broker2); + + cluster.waitForReadyBrokers(); + Admin client = cluster.createAdminClient(); + Map> partitionAssignment = new HashMap<>(); + partitionAssignment.put(partition0, assignment0); + partitionAssignment.put(partition1, assignment1); + + createTopic(client, topic, partitionAssignment); + + TopicPartition topicPartition0 = new TopicPartition(topic, partition0); + TopicPartition topicPartition1 = new TopicPartition(topic, partition1); + + TestUtils.assertLeader(client, topicPartition0, broker2); + TestUtils.assertLeader(client, topicPartition1, broker3); + + cluster.shutdownBroker(broker2); + TestUtils.assertLeader(client, topicPartition0, broker3); + cluster.startBroker(broker2); + TestUtils.waitForBrokersInIsr(client, topicPartition0, + JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet() + ); + TestUtils.waitForBrokersInIsr(client, topicPartition1, + JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet() + ); + + Path topicPartitionPath = tempTopicPartitionFile(Arrays.asList(topicPartition0, topicPartition1)); + String output = ToolsTestUtils.captureStandardOut(() -> + LeaderElectionCommand.main( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--path-to-json-file", topicPartitionPath.toString() + )); + + Iterator electionResultOutputIter = Arrays.stream(output.split("\n")).iterator(); + + assertTrue(electionResultOutputIter.hasNext()); + String firstLine = electionResultOutputIter.next(); + assertTrue(firstLine.contains(String.format( + "Successfully completed leader election (PREFERRED) for partitions %s", topicPartition0)), + String.format("Unexpected output: %s", firstLine)); + + assertTrue(electionResultOutputIter.hasNext()); + String secondLine = electionResultOutputIter.next(); + assertTrue(secondLine.contains(String.format("Valid replica already elected for partitions %s", topicPartition1)), + String.format("Unexpected output: %s", secondLine)); + } + + private static void createTopic(Admin admin, String topic, Map> replicaAssignment) throws ExecutionException, InterruptedException { + NewTopic newTopic = new NewTopic(topic, replicaAssignment); + List newTopics = Collections.singletonList(newTopic); + CreateTopicsResult createTopicResult = admin.createTopics(newTopics); + createTopicResult.all().get(); + } + + private static Path tempTopicPartitionFile(List partitions) throws Exception { + java.io.File file = TestUtils.tempFile("leader-election-command", ".json"); + + scala.collection.immutable.Set topicPartitionSet = + JavaConverters.asScalaBuffer(partitions).toSet(); + String jsonString = TestUtils.stringifyTopicPartitions(topicPartitionSet); + + Files.write(file.toPath(), jsonString.getBytes(StandardCharsets.UTF_8)); + + return file.toPath(); + } +}