Browse Source

KAFKA-14593: Move LeaderElectionCommand to tools (#13204)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
pull/14474/head
Omnia G.H Ibrahim 1 year ago committed by GitHub
parent
commit
7553d3f562
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-leader-election.sh
  2. 2
      bin/windows/kafka-leader-election.bat
  3. 8
      build.gradle
  4. 289
      core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
  5. 97
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
  6. 270
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
  7. 1
      settings.gradle
  8. 376
      tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
  9. 89
      tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java
  10. 298
      tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java

2
bin/kafka-leader-election.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.LeaderElectionCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.LeaderElectionCommand "$@"

2
bin/windows/kafka-leader-election.bat

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.LeaderElectionCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.LeaderElectionCommand %*

8
build.gradle

@ -873,7 +873,7 @@ project(':core') { @@ -873,7 +873,7 @@ project(':core') {
implementation project(':server-common')
implementation project(':group-coordinator')
implementation project(':metadata')
implementation project(':storage:api')
implementation project(':storage:storage-api')
implementation project(':tools:tools-api')
implementation project(':raft')
implementation project(':storage')
@ -915,7 +915,7 @@ project(':core') { @@ -915,7 +915,7 @@ project(':core') {
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:api').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
@ -1638,7 +1638,7 @@ project(':server-common') { @@ -1638,7 +1638,7 @@ project(':server-common') {
}
}
project(':storage:api') {
project(':storage:storage-api') {
archivesBaseName = "kafka-storage-api"
dependencies {
@ -1714,7 +1714,7 @@ project(':storage') { @@ -1714,7 +1714,7 @@ project(':storage') {
}
dependencies {
implementation project(':storage:api')
implementation project(':storage:storage-api')
implementation project(':server-common')
implementation project(':clients')
implementation libs.caffeine

289
core/src/main/scala/kafka/admin/LeaderElectionCommand.scala

@ -1,289 +0,0 @@ @@ -1,289 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.concurrent.duration._
object LeaderElectionCommand extends Logging {
def main(args: Array[String]): Unit = {
run(args, 30.second)
}
def run(args: Array[String], timeout: Duration): Unit = {
val commandOptions = new LeaderElectionCommandOptions(args)
CommandLineUtils.maybePrintHelpOrVersion(
commandOptions,
"This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
)
validate(commandOptions)
val electionType = commandOptions.options.valueOf(commandOptions.electionType)
val jsonFileTopicPartitions = Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { path =>
parseReplicaElectionData(Utils.readFileAsString(path))
}
val singleTopicPartition = (
Option(commandOptions.options.valueOf(commandOptions.topic)),
Option(commandOptions.options.valueOf(commandOptions.partition))
) match {
case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, partition)))
case _ => None
}
/* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.
* The validate function should be checking that this option is required if the --topic and --path-to-json-file
* are not specified.
*/
val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)
val adminClient = {
val props = Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { config =>
Utils.loadProps(config)
}.getOrElse(new Properties())
props.setProperty(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
commandOptions.options.valueOf(commandOptions.bootstrapServer)
)
props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)
Admin.create(props)
}
try {
electLeaders(adminClient, electionType, topicPartitions)
} finally {
adminClient.close()
}
}
private[this] def parseReplicaElectionData(jsonString: String): Set[TopicPartition] = {
Json.parseFull(jsonString) match {
case Some(js) =>
js.asJsonObject.get("partitions") match {
case Some(partitionsList) =>
val partitionsRaw = partitionsList.asJsonArray.iterator.map(_.asJsonObject)
val partitions = partitionsRaw.map { p =>
val topic = p("topic").to[String]
val partition = p("partition").to[Int]
new TopicPartition(topic, partition)
}.toBuffer
val duplicatePartitions = CoreUtils.duplicates(partitions)
if (duplicatePartitions.nonEmpty) {
throw new AdminOperationException(
s"Replica election data contains duplicate partitions: ${duplicatePartitions.mkString(",")}"
)
}
partitions.toSet
case None => throw new AdminOperationException("Replica election data is missing \"partitions\" field")
}
case None => throw new AdminOperationException("Replica election data is empty")
}
}
private[this] def electLeaders(
client: Admin,
electionType: ElectionType,
topicPartitions: Option[Set[TopicPartition]]
): Unit = {
val electionResults = try {
val partitions = topicPartitions.map(_.asJava).orNull
debug(s"Calling AdminClient.electLeaders($electionType, $partitions)")
client.electLeaders(electionType, partitions).partitions.get.asScala
} catch {
case e: ExecutionException =>
e.getCause match {
case cause: TimeoutException =>
val message = "Timeout waiting for election results"
println(message)
throw new AdminCommandFailedException(message, cause)
case cause: ClusterAuthorizationException =>
val message = "Not authorized to perform leader election"
println(message)
throw new AdminCommandFailedException(message, cause)
case _ =>
throw e
}
case e: Throwable =>
println("Error while making request")
throw e
}
val succeeded = mutable.Set.empty[TopicPartition]
val noop = mutable.Set.empty[TopicPartition]
val failed = mutable.Map.empty[TopicPartition, Throwable]
electionResults.foreach[Unit] { case (topicPartition, error) =>
if (error.isPresent) {
error.get match {
case _: ElectionNotNeededException => noop += topicPartition
case _ => failed += topicPartition -> error.get
}
} else {
succeeded += topicPartition
}
}
if (succeeded.nonEmpty) {
val partitions = succeeded.mkString(", ")
println(s"Successfully completed leader election ($electionType) for partitions $partitions")
}
if (noop.nonEmpty) {
val partitions = noop.mkString(", ")
println(s"Valid replica already elected for partitions $partitions")
}
if (failed.nonEmpty) {
val rootException = new AdminCommandFailedException(s"${failed.size} replica(s) could not be elected")
failed.forKeyValue { (topicPartition, exception) =>
println(s"Error completing leader election ($electionType) for partition: $topicPartition: $exception")
rootException.addSuppressed(exception)
}
throw rootException
}
}
private[this] def validate(commandOptions: LeaderElectionCommandOptions): Unit = {
// required options: --bootstrap-server and --election-type
var missingOptions = List.empty[String]
if (!commandOptions.options.has(commandOptions.bootstrapServer)) {
missingOptions = commandOptions.bootstrapServer.options().get(0) :: missingOptions
}
if (!commandOptions.options.has(commandOptions.electionType)) {
missingOptions = commandOptions.electionType.options().get(0) :: missingOptions
}
if (missingOptions.nonEmpty) {
throw new AdminCommandFailedException(s"Missing required option(s): ${missingOptions.mkString(", ")}")
}
// One and only one is required: --topic, --all-topic-partitions or --path-to-json-file
val mutuallyExclusiveOptions = Seq(
commandOptions.topic,
commandOptions.allTopicPartitions,
commandOptions.pathToJsonFile
)
mutuallyExclusiveOptions.count(commandOptions.options.has) match {
case 1 => // This is the only correct configuration, don't throw an exception
case _ =>
throw new AdminCommandFailedException(
"One and only one of the following options is required: " +
s"${mutuallyExclusiveOptions.map(_.options.get(0)).mkString(", ")}"
)
}
// --partition if and only if --topic is used
(
commandOptions.options.has(commandOptions.topic),
commandOptions.options.has(commandOptions.partition)
) match {
case (true, false) =>
throw new AdminCommandFailedException(
s"Missing required option(s): ${commandOptions.partition.options.get(0)}"
)
case (false, true) =>
throw new AdminCommandFailedException(
s"Option ${commandOptions.partition.options.get(0)} is only allowed if " +
s"${commandOptions.topic.options.get(0)} is used"
)
case _ => // Ignore; we have a valid configuration
}
}
}
private final class LeaderElectionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val bootstrapServer = parser
.accepts(
"bootstrap-server",
"A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.")
.withRequiredArg
.describedAs("host:port")
.ofType(classOf[String])
val adminClientConfig = parser
.accepts(
"admin.config",
"Configuration properties files to pass to the admin client")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
val pathToJsonFile = parser
.accepts(
"path-to-json-file",
"The JSON file with the list of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.")
.withRequiredArg
.describedAs("Path to JSON file")
.ofType(classOf[String])
val topic = parser
.accepts(
"topic",
"Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.")
.withRequiredArg
.describedAs("topic name")
.ofType(classOf[String])
val partition = parser
.accepts(
"partition",
"Partition id for which to perform an election. REQUIRED if --topic is specified.")
.withRequiredArg
.describedAs("partition id")
.ofType(classOf[Integer])
val allTopicPartitions = parser
.accepts(
"all-topic-partitions",
"Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified.")
val electionType = parser
.accepts(
"election-type",
"Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.")
.withRequiredArg
.describedAs("election type")
.withValuesConvertedBy(ElectionTypeConverter)
options = parser.parse(args: _*)
}
final object ElectionTypeConverter extends EnumConverter[ElectionType](classOf[ElectionType]) { }

97
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala

@ -1,97 +0,0 @@ @@ -1,97 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import scala.concurrent.duration._
/**
* For some error cases, we can save a little build time by avoiding the overhead for
* cluster creation and cleanup because the command is expected to fail immediately.
*/
class LeaderElectionCommandErrorTest {
@Test
def testTopicWithoutPartition(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--election-type", "unclean",
"--topic", "some-topic"
)
))
assertTrue(e.getMessage.startsWith("Missing required option(s)"))
assertTrue(e.getMessage.contains(" partition"))
}
@Test
def testPartitionWithoutTopic(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--election-type", "unclean",
"--all-topic-partitions",
"--partition", "0"
)
))
assertEquals("Option partition is only allowed if topic is used", e.getMessage)
}
@Test
def testMissingElectionType(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--topic", "some-topic",
"--partition", "0"
)
))
assertTrue(e.getMessage.startsWith("Missing required option(s)"))
assertTrue(e.getMessage.contains(" election-type"))
}
@Test
def testMissingTopicPartitionSelection(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--election-type", "preferred"
)
))
assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
assertTrue(e.getMessage.contains(" all-topic-partitions"))
assertTrue(e.getMessage.contains(" topic"))
assertTrue(e.getMessage.contains(" path-to-json-file"))
}
@Test
def testInvalidBroker(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
Array(
"--bootstrap-server", "example.com:1234",
"--election-type", "unclean",
"--all-topic-partitions"
),
1.seconds
))
assertTrue(e.getCause.isInstanceOf[TimeoutException])
}
}

270
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala

@ -1,270 +0,0 @@ @@ -1,270 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}
import kafka.server.IntegrationTestUtils.createTopic
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
@Tag("integration")
final class LeaderElectionCommandTest(cluster: ClusterInstance) {
import LeaderElectionCommandTest._
val broker1 = 0
val broker2 = 1
val broker3 = 2
@BeforeEach
def setup(clusterConfig: ClusterConfig): Unit = {
TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true")
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
}
@ClusterTest
def testAllTopicPartition(): Unit = {
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
cluster.shutdownBroker(broker3)
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
cluster.shutdownBroker(broker2)
TestUtils.assertNoLeader(client, topicPartition)
cluster.startBroker(broker3)
TestUtils.waitForOnlineBroker(client, broker3)
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--all-topic-partitions"
)
)
TestUtils.assertLeader(client, topicPartition, broker3)
}
@ClusterTest
def testTopicPartition(): Unit = {
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
cluster.shutdownBroker(broker3)
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
cluster.shutdownBroker(broker2)
TestUtils.assertNoLeader(client, topicPartition)
cluster.startBroker(broker3)
TestUtils.waitForOnlineBroker(client, broker3)
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--topic", topic,
"--partition", partition.toString
)
)
TestUtils.assertLeader(client, topicPartition, broker3)
}
@ClusterTest
def testPathToJsonFile(): Unit = {
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
cluster.shutdownBroker(broker3)
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
cluster.shutdownBroker(broker2)
TestUtils.assertNoLeader(client, topicPartition)
cluster.startBroker(broker3)
TestUtils.waitForOnlineBroker(client, broker3)
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString
)
)
TestUtils.assertLeader(client, topicPartition, broker3)
}
@ClusterTest
def testPreferredReplicaElection(): Unit = {
val topic = "preferred-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
cluster.shutdownBroker(broker2)
TestUtils.assertLeader(client, topicPartition, broker3)
cluster.startBroker(broker2)
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--all-topic-partitions"
)
)
TestUtils.assertLeader(client, topicPartition, broker2)
}
@ClusterTest
def testTopicDoesNotExist(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--topic", "unknown-topic-name",
"--partition", "0"
)
))
assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
}
@ClusterTest
def testElectionResultOutput(): Unit = {
val topic = "non-preferred-topic"
val partition0 = 0
val partition1 = 1
val assignment0 = Seq(broker2, broker3)
val assignment1 = Seq(broker3, broker2)
cluster.waitForReadyBrokers()
val client = cluster.createAdminClient()
createTopic(client, topic, Map(
partition0 -> assignment0,
partition1 -> assignment1
))
val topicPartition0 = new TopicPartition(topic, partition0)
val topicPartition1 = new TopicPartition(topic, partition1)
TestUtils.assertLeader(client, topicPartition0, broker2)
TestUtils.assertLeader(client, topicPartition1, broker3)
cluster.shutdownBroker(broker2)
TestUtils.assertLeader(client, topicPartition0, broker3)
cluster.startBroker(broker2)
TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
val output = TestUtils.grabConsoleOutput(
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--path-to-json-file", topicPartitionPath.toString
)
)
)
val electionResultOutputIter = output.split("\n").iterator
assertTrue(electionResultOutputIter.hasNext)
val firstLine = electionResultOutputIter.next()
assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"),
s"Unexpected output: $firstLine")
assertTrue(electionResultOutputIter.hasNext)
val secondLine = electionResultOutputIter.next()
assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"),
s"Unexpected output: $secondLine")
}
}
object LeaderElectionCommandTest {
def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = {
Map(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers),
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000"
)
}
def bootstrapServers(servers: Seq[KafkaServer]): String = {
TestUtils.plaintextBootstrapServers(servers)
}
def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = {
val file = TestUtils.tempFile("leader-election-command", ".json")
val jsonString = TestUtils.stringifyTopicPartitions(partitions)
Files.write(file.toPath, jsonString.getBytes(StandardCharsets.UTF_8))
file.toPath
}
}

1
settings.gradle

@ -96,4 +96,5 @@ include 'clients', @@ -96,4 +96,5 @@ include 'clients',
'tools:tools-api',
'trogdor'
project(":storage:api").name = "storage-api"
rootProject.name = 'kafka'

376
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java

@ -0,0 +1,376 @@ @@ -0,0 +1,376 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.databind.JsonMappingException;
import joptsimple.AbstractOptionSpec;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSpecBuilder;
import joptsimple.util.EnumConverter;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ElectionNotNeededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.Json;
import org.apache.kafka.server.util.json.DecodeJson;
import org.apache.kafka.server.util.json.JsonObject;
import org.apache.kafka.server.util.json.JsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class LeaderElectionCommand {
private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
public static void main(String... args) {
try {
run(Duration.ofMillis(30000), args);
} catch (Exception e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
}
}
static void run(Duration timeoutMs, String... args) throws Exception {
LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
commandOptions.maybePrintHelpOrVersion();
commandOptions.validate();
ElectionType electionType = commandOptions.getElectionType();
Optional<Set<TopicPartition>> jsonFileTopicPartitions =
Optional.ofNullable(commandOptions.getPathToJsonFile())
.map(path -> parseReplicaElectionData(path));
Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
final Optional<Set<TopicPartition>> singleTopicPartition =
(topicOption.isPresent() && partitionOption.isPresent()) ?
Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
Optional.empty();
/* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
* The validate function should be checking that this option is required if the --topic and --path-to-json-file
* are not specified.
*/
Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
Properties props = new Properties();
if (commandOptions.hasAdminClientConfig()) {
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
}
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
try (Admin adminClient = Admin.create(props)) {
electLeaders(adminClient, electionType, topicPartitions);
}
}
private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
LOG.debug("Calling AdminClient.electLeaders({}, {})", electionType, partitions.orElse(null));
Map<TopicPartition, Optional<Throwable>> electionResults;
try {
electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
String message = "Timeout waiting for election results";
System.out.println(message);
throw new AdminCommandFailedException(message, e.getCause());
} else if (e.getCause() instanceof ClusterAuthorizationException) {
String message = "Not authorized to perform leader election";
System.out.println(message);
throw new AdminCommandFailedException(message, e.getCause().getCause());
} else {
throw new RuntimeException(e);
}
} catch (InterruptedException e) {
System.out.println("Error while making request");
throw new RuntimeException(e);
}
Set<TopicPartition> succeeded = new HashSet<>();
Set<TopicPartition> noop = new HashSet<>();
Map<TopicPartition, Throwable> failed = new HashMap<>();
electionResults.entrySet().stream().forEach(entry -> {
Optional<Throwable> error = entry.getValue();
if (error.isPresent()) {
if (error.get() instanceof ElectionNotNeededException) {
noop.add(entry.getKey());
} else {
failed.put(entry.getKey(), error.get());
}
} else {
succeeded.add(entry.getKey());
}
});
if (!succeeded.isEmpty()) {
String partitionsAsString = succeeded.stream()
.map(TopicPartition::toString)
.collect(Collectors.joining(", "));
System.out.println(String.format("Successfully completed leader election (%s) for partitions %s",
electionType, partitionsAsString));
}
if (!noop.isEmpty()) {
String partitionsAsString = noop.stream()
.map(TopicPartition::toString)
.collect(Collectors.joining(", "));
System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString));
}
if (!failed.isEmpty()) {
AdminCommandFailedException rootException =
new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
failed.entrySet().forEach(entry -> {
System.out.println(String.format("Error completing leader election (%s) for partition: %s: %s",
electionType, entry.getKey(), entry.getValue()));
rootException.addSuppressed(entry.getValue());
});
throw rootException;
}
}
private static Set<TopicPartition> parseReplicaElectionData(String path) {
Optional<JsonValue> jsonFile;
try {
jsonFile = Json.parseFull(Utils.readFileAsString(path));
return jsonFile.map(js -> {
try {
return topicPartitions(js);
} catch (JsonMappingException e) {
throw new RuntimeException(e);
}
}).orElseThrow(() -> new AdminOperationException("Replica election data is empty"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static Set<TopicPartition> topicPartitions(JsonValue js) throws JsonMappingException {
return js.asJsonObject().get("partitions")
.map(partitionsList -> {
try {
return toTopicPartition(partitionsList);
} catch (JsonMappingException e) {
throw new RuntimeException(e);
}
})
.orElseThrow(() -> new AdminOperationException("Replica election data is missing \"partitions\" field"));
}
private static Set<TopicPartition> toTopicPartition(JsonValue partitionsList) throws JsonMappingException {
List<TopicPartition> partitions = new ArrayList<>();
Iterator<JsonValue> iterator = partitionsList.asJsonArray().iterator();
while (iterator.hasNext()) {
JsonObject partitionJs = iterator.next().asJsonObject();
String topic = partitionJs.apply("topic").to(STRING);
int partition = partitionJs.apply("partition").to(INT);
partitions.add(new TopicPartition(topic, partition));
}
Set<TopicPartition> duplicatePartitions = partitions.stream()
.filter(i -> Collections.frequency(partitions, i) > 1)
.collect(Collectors.toSet());
if (duplicatePartitions.size() > 0) {
throw new AdminOperationException(String.format(
"Replica election data contains duplicate partitions: %s", String.join(",", duplicatePartitions.toString()))
);
}
return new HashSet<>(partitions);
}
static class LeaderElectionCommandOptions extends CommandDefaultOptions {
private final ArgumentAcceptingOptionSpec<String> bootstrapServer;
private final ArgumentAcceptingOptionSpec<String> adminClientConfig;
private final ArgumentAcceptingOptionSpec<String> pathToJsonFile;
private final ArgumentAcceptingOptionSpec<String> topic;
private final ArgumentAcceptingOptionSpec<Integer> partition;
private final OptionSpecBuilder allTopicPartitions;
private final ArgumentAcceptingOptionSpec<ElectionType> electionType;
public LeaderElectionCommandOptions(String[] args) {
super(args);
bootstrapServer = parser
.accepts(
"bootstrap-server",
"A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.")
.withRequiredArg()
.describedAs("host:port")
.ofType(String.class);
adminClientConfig = parser
.accepts(
"admin.config",
"Configuration properties files to pass to the admin client")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
pathToJsonFile = parser
.accepts(
"path-to-json-file",
"The JSON file with the list of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.")
.withRequiredArg()
.describedAs("Path to JSON file")
.ofType(String.class);
topic = parser
.accepts(
"topic",
"Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.")
.withRequiredArg()
.describedAs("topic name")
.ofType(String.class);
partition = parser
.accepts(
"partition",
"Partition id for which to perform an election. REQUIRED if --topic is specified.")
.withRequiredArg()
.describedAs("partition id")
.ofType(Integer.class);
allTopicPartitions = parser
.accepts(
"all-topic-partitions",
"Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified.");
electionType = parser
.accepts(
"election-type",
"Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.")
.withRequiredArg()
.describedAs("election type")
.withValuesConvertedBy(new ElectionTypeConverter());
options = parser.parse(args);
}
public boolean hasAdminClientConfig() {
return options.has(adminClientConfig);
}
public ElectionType getElectionType() {
return options.valueOf(electionType);
}
public String getPathToJsonFile() {
return options.valueOf(pathToJsonFile);
}
public String getBootstrapServer() {
return options.valueOf(bootstrapServer);
}
public String getAdminClientConfig() {
return options.valueOf(adminClientConfig);
}
public String getTopic() {
return options.valueOf(topic);
}
public Integer getPartition() {
return options.valueOf(partition);
}
public void validate() {
// required options: --bootstrap-server and --election-type
List<String> missingOptions = new ArrayList<>();
if (!options.has(bootstrapServer)) {
missingOptions.add(bootstrapServer.options().get(0));
}
if (!options.has(electionType)) {
missingOptions.add(electionType.options().get(0));
}
if (!missingOptions.isEmpty()) {
throw new AdminCommandFailedException("Missing required option(s): " + String.join(", ", missingOptions));
}
// One and only one is required: --topic, --all-topic-partitions or --path-to-json-file
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = Arrays.asList(
topic,
allTopicPartitions,
pathToJsonFile
);
long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream()
.filter(abstractOptionSpec -> options.has(abstractOptionSpec))
.count();
// 1 is the only correct configuration, don't throw an exception
if (mutuallyExclusiveOptionsCount != 1) {
throw new AdminCommandFailedException(
"One and only one of the following options is required: " +
mutuallyExclusiveOptions.stream().map(opt -> opt.options().get(0)).collect(Collectors.joining(", "))
);
}
// --partition if and only if --topic is used
if (options.has(topic) && !options.has(partition)) {
throw new AdminCommandFailedException(String.format("Missing required option(s): %s",
partition.options().get(0)));
}
if (!options.has(topic) && options.has(partition)) {
throw new AdminCommandFailedException(String.format("Option %s is only allowed if %s is used",
partition.options().get(0),
topic.options().get(0)
));
}
}
public void maybePrintHelpOrVersion() {
CommandLineUtils.maybePrintHelpOrVersion(
this,
"This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
);
}
}
static class ElectionTypeConverter extends EnumConverter<ElectionType> {
public ElectionTypeConverter() {
super(ElectionType.class);
}
}
}

89
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java

@ -0,0 +1,89 @@ @@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* For some error cases, we can save a little build time by avoiding the overhead for
* cluster creation and cleanup because the command is expected to fail immediately.
*/
public class LeaderElectionCommandErrorTest {
@Test
public void testTopicWithoutPartition() {
String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
"--bootstrap-server", "nohost:9092",
"--election-type", "unclean",
"--topic", "some-topic"
));
assertTrue(out.startsWith("Missing required option(s)"));
assertTrue(out.contains(" partition"));
}
@Test
public void testPartitionWithoutTopic() {
String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
"--bootstrap-server", "nohost:9092",
"--election-type", "unclean",
"--all-topic-partitions",
"--partition", "0"
));
String[] rows = out.split("\n");
assertTrue(out.startsWith("Option partition is only allowed if topic is used"));
}
@Test
public void testMissingElectionType() {
String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
"--bootstrap-server", "nohost:9092",
"--topic", "some-topic",
"--partition", "0"
));
assertTrue(out.startsWith("Missing required option(s)"));
assertTrue(out.contains(" election-type"));
}
@Test
public void testMissingTopicPartitionSelection() {
String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
"--bootstrap-server", "nohost:9092",
"--election-type", "preferred"
));
assertTrue(out.startsWith("One and only one of the following options is required: "));
assertTrue(out.contains(" all-topic-partitions"));
assertTrue(out.contains(" topic"));
assertTrue(out.contains(" path-to-json-file"));
}
@Test
public void testInvalidBroker() {
Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
Duration.ofSeconds(1),
"--bootstrap-server", "example.com:1234",
"--election-type", "unclean",
"--all-topic-partitions"
));
assertTrue(e.getCause() instanceof TimeoutException);
}
}

298
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java

@ -0,0 +1,298 @@ @@ -0,0 +1,298 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.collection.JavaConverters;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("deprecation")
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
@Tag("integration")
public class LeaderElectionCommandTest {
private final ClusterInstance cluster;
int broker2 = 1;
int broker3 = 2;
public LeaderElectionCommandTest(ClusterInstance cluster) {
this.cluster = cluster;
}
@BeforeEach
void setup(ClusterConfig clusterConfig) {
TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
clusterConfig.serverProperties().put("auto.leader.rebalance.enable", "false");
clusterConfig.serverProperties().put("controlled.shutdown.enable", "true");
clusterConfig.serverProperties().put("controlled.shutdown.max.retries", "1");
clusterConfig.serverProperties().put("controlled.shutdown.retry.backoff.ms", "1000");
clusterConfig.serverProperties().put("offsets.topic.replication.factor", "2");
}
@ClusterTest
public void testAllTopicPartition() throws InterruptedException, ExecutionException {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = Arrays.asList(broker2, broker3);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
createTopic(client, topic, Collections.singletonMap(partition, assignment));
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
LeaderElectionCommand.main(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--all-topic-partitions"
);
TestUtils.assertLeader(client, topicPartition, broker3);
}
@ClusterTest
public void testTopicPartition() throws InterruptedException, ExecutionException {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = Arrays.asList(broker2, broker3);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
createTopic(client, topic, Collections.singletonMap(partition, assignment));
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
LeaderElectionCommand.main(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--topic", topic,
"--partition", Integer.toString(partition)
);
TestUtils.assertLeader(client, topicPartition, broker3);
}
@ClusterTest
public void testPathToJsonFile() throws Exception {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = Arrays.asList(broker2, broker3);
cluster.waitForReadyBrokers();
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
Admin client = cluster.createAdminClient();
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition));
LeaderElectionCommand.main(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString()
);
TestUtils.assertLeader(client, topicPartition, broker3);
}
@ClusterTest
public void testPreferredReplicaElection() throws InterruptedException, ExecutionException {
String topic = "preferred-topic";
int partition = 0;
List<Integer> assignment = Arrays.asList(broker2, broker3);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition,
JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
);
LeaderElectionCommand.main(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--all-topic-partitions"
);
TestUtils.assertLeader(client, topicPartition, broker2);
}
@ClusterTest
public void testTopicDoesNotExist() {
Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
Duration.ofSeconds(30),
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--topic", "unknown-topic-name",
"--partition", "0"
));
assertTrue(e.getSuppressed()[0] instanceof UnknownTopicOrPartitionException);
}
@ClusterTest
public void testElectionResultOutput() throws Exception {
String topic = "non-preferred-topic";
int partition0 = 0;
int partition1 = 1;
List<Integer> assignment0 = Arrays.asList(broker2, broker3);
List<Integer> assignment1 = Arrays.asList(broker3, broker2);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition0, assignment0);
partitionAssignment.put(partition1, assignment1);
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition0 = new TopicPartition(topic, partition0);
TopicPartition topicPartition1 = new TopicPartition(topic, partition1);
TestUtils.assertLeader(client, topicPartition0, broker2);
TestUtils.assertLeader(client, topicPartition1, broker3);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition0, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition0,
JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
);
TestUtils.waitForBrokersInIsr(client, topicPartition1,
JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
);
Path topicPartitionPath = tempTopicPartitionFile(Arrays.asList(topicPartition0, topicPartition1));
String output = ToolsTestUtils.captureStandardOut(() ->
LeaderElectionCommand.main(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--path-to-json-file", topicPartitionPath.toString()
));
Iterator<String> electionResultOutputIter = Arrays.stream(output.split("\n")).iterator();
assertTrue(electionResultOutputIter.hasNext());
String firstLine = electionResultOutputIter.next();
assertTrue(firstLine.contains(String.format(
"Successfully completed leader election (PREFERRED) for partitions %s", topicPartition0)),
String.format("Unexpected output: %s", firstLine));
assertTrue(electionResultOutputIter.hasNext());
String secondLine = electionResultOutputIter.next();
assertTrue(secondLine.contains(String.format("Valid replica already elected for partitions %s", topicPartition1)),
String.format("Unexpected output: %s", secondLine));
}
private static void createTopic(Admin admin, String topic, Map<Integer, List<Integer>> replicaAssignment) throws ExecutionException, InterruptedException {
NewTopic newTopic = new NewTopic(topic, replicaAssignment);
List<NewTopic> newTopics = Collections.singletonList(newTopic);
CreateTopicsResult createTopicResult = admin.createTopics(newTopics);
createTopicResult.all().get();
}
private static Path tempTopicPartitionFile(List<TopicPartition> partitions) throws Exception {
java.io.File file = TestUtils.tempFile("leader-election-command", ".json");
scala.collection.immutable.Set<TopicPartition> topicPartitionSet =
JavaConverters.asScalaBuffer(partitions).toSet();
String jsonString = TestUtils.stringifyTopicPartitions(topicPartitionSet);
Files.write(file.toPath(), jsonString.getBytes(StandardCharsets.UTF_8));
return file.toPath();
}
}
Loading…
Cancel
Save