Browse Source

KAFKA-8405; Remove deprecated `kafka-preferred-replica-election` command (#10443)

The `kafka-preferred-replica-election` command was deprecated in 2.4. This path removes it for 3.0. `kafka-leader-election` can be used instead.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/10454/head
David Jacot 4 years ago committed by GitHub
parent
commit
6d7a9012dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      bin/kafka-preferred-replica-election.sh
  2. 17
      bin/windows/kafka-preferred-replica-election.bat
  3. 304
      core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
  4. 383
      core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
  5. 1
      docs/upgrade.html

17
bin/kafka-preferred-replica-election.sh

@ -1,17 +0,0 @@ @@ -1,17 +0,0 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand "$@"

17
bin/windows/kafka-preferred-replica-election.bat

@ -1,17 +0,0 @@ @@ -1,17 +0,0 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.PreferredReplicaLeaderElectionCommand %*

304
core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala

@ -1,304 +0,0 @@ @@ -1,304 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import scala.jdk.CollectionConverters._
import collection._
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.common.AdminCommandFailedException
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
import org.apache.zookeeper.KeeperException.NodeExistsException
object PreferredReplicaLeaderElectionCommand extends Logging {
def main(args: Array[String]): Unit = {
val timeout = 30000
run(args, timeout)
}
def run(args: Array[String], timeout: Int = 30000): Unit = {
println("This tool is deprecated. Please use kafka-leader-election tool. Tracking issue: KAFKA-8405")
val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to causes leadership for each partition to be transferred back to the 'preferred replica'," +
" it can be used to balance leadership among the servers.")
CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options)
if (commandOpts.options.has(commandOpts.bootstrapServerOpt) == commandOpts.options.has(commandOpts.zkConnectOpt)) {
CommandLineUtils.printUsageAndDie(commandOpts.parser, s"Exactly one of '${commandOpts.bootstrapServerOpt}' or '${commandOpts.zkConnectOpt}' must be provided")
}
val partitionsForPreferredReplicaElection =
if (commandOpts.options.has(commandOpts.jsonFileOpt))
Some(parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt))))
else
None
val preferredReplicaElectionCommand = if (commandOpts.options.has(commandOpts.zkConnectOpt)) {
println(s"Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.")
println(s"Use --bootstrap-server instead to specify a broker to connect to.")
new ZkCommand(commandOpts.options.valueOf(commandOpts.zkConnectOpt),
JaasUtils.isZkSaslEnabled,
timeout)
} else {
val adminProps = if (commandOpts.options.has(commandOpts.adminClientConfigOpt))
Utils.loadProps(commandOpts.options.valueOf(commandOpts.adminClientConfigOpt))
else
new Properties()
adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt))
adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString)
adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (timeout * 2).toString)
new AdminClientCommand(adminProps)
}
try {
preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection)
} finally {
preferredReplicaElectionCommand.close()
}
}
def parsePreferredReplicaElectionData(jsonString: String): collection.immutable.Set[TopicPartition] = {
Json.parseFull(jsonString) match {
case Some(js) =>
js.asJsonObject.get("partitions") match {
case Some(partitionsList) =>
val partitionsRaw = partitionsList.asJsonArray.iterator.map(_.asJsonObject)
val partitions = partitionsRaw.map { p =>
val topic = p("topic").to[String]
val partition = p("partition").to[Int]
new TopicPartition(topic, partition)
}.toBuffer
val duplicatePartitions = CoreUtils.duplicates(partitions)
if (duplicatePartitions.nonEmpty)
throw new AdminOperationException("Preferred replica election data contains duplicate partitions: %s".format(duplicatePartitions.mkString(",")))
partitions.toSet
case None => throw new AdminOperationException("Preferred replica election data is empty")
}
case None => throw new AdminOperationException("Preferred replica election data is empty")
}
}
def writePreferredReplicaElectionData(zkClient: KafkaZkClient,
partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]): Unit = {
try {
zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet)
println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
} catch {
case _: NodeExistsException =>
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(zkClient.getPreferredReplicaElection.mkString(",")))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
class PreferredReplicaLeaderElectionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
"for which preferred replica leader election should be done, in the following format - \n" +
"{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
"Defaults to all existing partitions")
.withRequiredArg
.describedAs("list of partitions for which preferred replica leader election needs to be triggered")
.ofType(classOf[String])
private val zookeeperOptBuilder = parser.accepts("zookeeper",
"DEPRECATED. The connection string for the zookeeper connection in the " +
"form host:port. Multiple URLS can be given to allow fail-over. " +
"Replaced by --bootstrap-server, REQUIRED unless --bootstrap-server is given.")
private val bootstrapOptBuilder = parser.accepts("bootstrap-server",
"A hostname and port for the broker to connect to, " +
"in the form host:port. Multiple comma-separated URLs can be given. REQUIRED unless --zookeeper is given.")
parser.mutuallyExclusive(zookeeperOptBuilder, bootstrapOptBuilder)
val bootstrapServerOpt = bootstrapOptBuilder
.withRequiredArg
.describedAs("host:port")
.ofType(classOf[String])
val zkConnectOpt = zookeeperOptBuilder
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val adminClientConfigOpt = parser.accepts("admin.config",
"Admin client config properties file to pass to the admin client when --bootstrap-server is given.")
.availableIf(bootstrapServerOpt)
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
options = parser.parse(args: _*)
}
/** Abstraction over different ways to perform a leader election */
trait Command {
/** Elect the preferred leader for the given {@code partitionsForElection}.
* If the given {@code partitionsForElection} are None then elect the preferred leader for all partitions.
*/
def electPreferredLeaders(partitionsForElection: Option[Set[TopicPartition]]): Unit
def close(): Unit
}
class ZkCommand(zkConnect: String, isSecure: Boolean, timeout: Int)
extends Command {
var zkClient: KafkaZkClient = null
val time = Time.SYSTEM
zkClient = KafkaZkClient(zkConnect, isSecure, timeout, timeout, Int.MaxValue, time)
override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = {
try {
val topics =
partitionsFromUser match {
case Some(partitions) =>
partitions.map(_.topic).toSet
case None =>
zkClient.getAllPartitions.map(_.topic)
}
val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
partitions.map(new TopicPartition(topic, _))
}.toSet
val (validPartitions, invalidPartitions) =
partitionsFromUser match {
case Some(partitions) =>
partitions.partition(partitionsFromZk.contains)
case None =>
(zkClient.getAllPartitions, Set.empty)
}
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
println("Successfully started preferred replica election for partitions %s".format(validPartitions))
invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))
} catch {
case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
override def close(): Unit = {
if (zkClient != null)
zkClient.close()
}
}
/** Election via AdminClient.electPreferredLeaders() */
class AdminClientCommand(adminClientProps: Properties)
extends Command with Logging {
val adminClient = Admin.create(adminClientProps)
override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = {
val partitions = partitionsFromUser match {
case Some(partitionsFromUser) => partitionsFromUser.asJava
case None => null
}
debug(s"Calling AdminClient.electLeaders(ElectionType.PREFERRED, $partitions)")
val electionResults = try {
adminClient.electLeaders(ElectionType.PREFERRED, partitions).partitions.get.asScala
} catch {
case e: ExecutionException =>
val cause = e.getCause
if (cause.isInstanceOf[TimeoutException]) {
println("Timeout waiting for election results")
throw new AdminCommandFailedException("Timeout waiting for election results", cause)
} else if (cause.isInstanceOf[ClusterAuthorizationException]) {
println(s"Not authorized to perform leader election")
throw new AdminCommandFailedException("Not authorized to perform leader election", cause)
}
throw e
case e: Throwable =>
// We don't even know the attempted partitions
println("Error while making request")
e.printStackTrace()
return
}
val succeeded = mutable.Set.empty[TopicPartition]
val noop = mutable.Set.empty[TopicPartition]
val failed = mutable.Map.empty[TopicPartition, Throwable]
electionResults.foreach[Unit] { case (topicPartition, error) =>
if (error.isPresent) {
if (error.get.isInstanceOf[ElectionNotNeededException]) {
noop += topicPartition
} else {
failed += topicPartition -> error.get
}
} else {
succeeded += topicPartition
}
}
if (!succeeded.isEmpty) {
val partitions = succeeded.mkString(", ")
println(s"Successfully completed preferred leader election for partitions $partitions")
}
if (!noop.isEmpty) {
val partitions = succeeded.mkString(", ")
println(s"Preferred replica already elected for partitions $partitions")
}
if (!failed.isEmpty) {
val rootException = new AdminCommandFailedException(s"${failed.size} preferred replica(s) could not be elected")
failed.forKeyValue { (topicPartition, exception) =>
println(s"Error completing preferred leader election for partition: $topicPartition: $exception")
rootException.addSuppressed(exception)
}
throw rootException
}
}
override def close(): Unit = {
debug("Closing AdminClient")
adminClient.close()
}
}
}
class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) {
def moveLeaderToPreferredReplica(): Unit = {
try {
val topics = partitionsFromUser.map(_.topic).toSet
val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap { case (topic, partitions) =>
partitions.map(new TopicPartition(topic, _))
}.toSet
val (validPartitions, invalidPartitions) = partitionsFromUser.partition(partitionsFromZk.contains)
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
println("Successfully started preferred replica election for partitions %s".format(validPartitions))
invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))
} catch {
case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
}

383
core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala

@ -1,383 +0,0 @@ @@ -1,383 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.Properties
import scala.collection.Seq
import kafka.common.AdminCommandFailedException
import kafka.security.authorizer.AclAuthorizer
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.{Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.apache.kafka.test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import scala.jdk.CollectionConverters._
class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging {
var servers: Seq[KafkaServer] = Seq()
@AfterEach
override def tearDown(): Unit = {
TestUtils.shutdownServers(servers)
super.tearDown()
}
private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]],
authorizer: Option[String] = None): Unit = {
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
brokerConfigs.foreach(p => p.setProperty("auto.leader.rebalance.enable", "false"))
authorizer match {
case Some(className) =>
brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className))
case None =>
}
createTestTopicAndCluster(topicPartition, brokerConfigs)
}
private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]],
brokerConfigs: Seq[Properties]): Unit = {
// create brokers
servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
partitionsAndAssignments.forKeyValue { (tp, assignment) =>
zkClient.createTopicAssignment(tp.topic, Some(Uuid.randomUuid()), Map(tp -> assignment))
}
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(
() =>
servers.forall { server =>
partitionsAndAssignments.forall { partitionAndAssignment =>
server.getLogManager.getLog(partitionAndAssignment._1).isDefined
}
},
"Replicas for topic test not created"
)
}
/** Bounce the given targetServer and wait for all servers to get metadata for the given partition */
private def bounceServer(targetServer: Int, partition: TopicPartition): Unit = {
debug(s"Shutting down server $targetServer so a non-preferred replica becomes leader")
servers(targetServer).shutdown()
debug(s"Starting server $targetServer now that a non-preferred replica is leader")
servers(targetServer).startup()
TestUtils.waitUntilTrue(() => servers.forall { server =>
server.metadataCache.getPartitionInfo(partition.topic, partition.partition).exists { partitionState =>
partitionState.isr.contains(targetServer)
}
},
s"Replicas for partition $partition not created")
}
private def getController() = {
servers.find(p => p.kafkaController.isActive)
}
private def awaitLeader(topicPartition: TopicPartition, timeoutMs: Long = test.TestUtils.DEFAULT_MAX_WAIT_MS): Int = {
TestUtils.awaitValue(() => {
servers.head.metadataCache.getPartitionInfo(topicPartition.topic, topicPartition.partition).map(_.leader)
}, s"Timed out waiting to find current leader of $topicPartition", timeoutMs)
}
private def bootstrapServer(broker: Int = 0): String = {
val port = servers(broker).socketServer.boundPort(ListenerName.normalised("PLAINTEXT"))
debug("Server bound to port "+port)
s"localhost:$port"
}
val testPartition = new TopicPartition("test", 0)
val testPartitionAssignment = List(1, 2, 0)
val testPartitionPreferredLeader = testPartitionAssignment.head
val testPartitionAndAssignment = Map(testPartition -> testPartitionAssignment)
/** Test the case multiple values are given for --bootstrap-broker */
@Test
def testMultipleBrokersGiven(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}"))
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
}
/** Test the case when an invalid broker is given for --bootstrap-broker */
@Test
def testInvalidBrokerGiven(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", "example.com:1234"), timeout = 1000))
assertTrue(e.getCause.isInstanceOf[TimeoutException])
}
/** Test the case where no partitions are given (=> elect all partitions) */
@Test
def testNoPartitionsGiven(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer()))
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
}
private def toJsonFile(partitions: Set[TopicPartition]): File = {
val jsonFile = File.createTempFile("preferredreplicaelection", ".js")
jsonFile.deleteOnExit()
val jsonString = TestUtils.stringifyTopicPartitions(partitions)
debug("Using json: "+jsonString)
Files.write(Paths.get(jsonFile.getAbsolutePath), jsonString.getBytes(StandardCharsets.UTF_8))
jsonFile
}
/** Test the case where a list of partitions is given */
@Test
def testSingletonPartitionGiven(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
} finally {
jsonFile.delete()
}
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
}
/** Test the case where a topic does not exist */
@Test
def testTopicDoesNotExist(): Unit = {
val nonExistentPartition = new TopicPartition("does.not.exist", 0)
val nonExistentPartitionAssignment = List(1, 2, 0)
val nonExistentPartitionAndAssignment = Map(nonExistentPartition -> nonExistentPartitionAssignment)
createTestTopicAndCluster(testPartitionAndAssignment)
val jsonFile = toJsonFile(nonExistentPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
} catch {
case e: AdminCommandFailedException =>
val suppressed = e.getSuppressed()(0)
assertTrue(suppressed.isInstanceOf[UnknownTopicOrPartitionException])
case e: Throwable =>
e.printStackTrace()
throw e
} finally {
jsonFile.delete()
}
}
/** Test the case where several partitions are given */
@Test
def testMultiplePartitionsSameAssignment(): Unit = {
val testPartitionA = new TopicPartition("testA", 0)
val testPartitionB = new TopicPartition("testB", 0)
val testPartitionAssignment = List(1, 2, 0)
val testPartitionPreferredLeader = testPartitionAssignment.head
val testPartitionAndAssignment = Map(testPartitionA -> testPartitionAssignment, testPartitionB -> testPartitionAssignment)
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartitionA)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartitionA))
assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartitionB))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
} finally {
jsonFile.delete()
}
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartitionA))
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartitionB))
}
/** What happens when the preferred replica is already the leader? */
@Test
def testNoopElection(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment)
// Don't bounce the server. Doublecheck the leader for the partition is the preferred one
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
// Now do the election, even though the preferred replica is *already* the leader
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
// Check the leader for the partition still is the preferred one
assertEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
}
/** What happens if the preferred replica is offline? */
@Test
def testWithOfflinePreferredReplica(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
val leader = awaitLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Now kill the preferred one
servers(testPartitionPreferredLeader).shutdown()
// Now try to elect the preferred one
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
fail();
} catch {
case e: AdminCommandFailedException =>
assertEquals("1 preferred replica(s) could not be elected", e.getMessage)
val suppressed = e.getSuppressed()(0)
assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException])
assertTrue(suppressed.getMessage.contains("Failed to elect leader for partition test-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
suppressed.getMessage)
// Check we still have the same leader
assertEquals(leader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
}
/** What happens if the controller gets killed just before an election? */
@Test
def testTimeout(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
val leader = awaitLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Now kill the controller just before we trigger the election
val controller = getController().get.config.brokerId
servers(controller).shutdown()
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(controller),
"--path-to-json-file", jsonFile.getAbsolutePath),
timeout = 2000)
fail();
} catch {
case e: AdminCommandFailedException =>
assertEquals("Timeout waiting for election results", e.getMessage)
// Check we still have the same leader
assertEquals(leader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
}
/** Test the case where client is not authorized */
@Test
def testAuthzFailure(): Unit = {
createTestTopicAndCluster(testPartitionAndAssignment, Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName))
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
val leader = awaitLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, awaitLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
fail()
} catch {
case e: AdminCommandFailedException =>
assertEquals("Not authorized to perform leader election", e.getMessage)
assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
// Check we still have the same leader
assertEquals(leader, awaitLeader(testPartition))
} finally {
jsonFile.delete()
}
}
@Test
def testPreferredReplicaJsonData(): Unit = {
// write preferred replica json data to zk path
val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
// try to read it back and compare with what was written
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
assertEquals(partitionsForPreferredReplicaElection, partitionsUndergoingPreferredReplicaElection,
"Preferred replica election ser-de failed")
}
@Test
def testBasicPreferredReplicaElection(): Unit = {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
val partition = 0
val preferredReplica = 0
// create brokers
val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
// create the topic
adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None)
// trigger preferred replica election
val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
preferredReplicaElection.moveLeaderToPreferredReplica()
val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader))
assertEquals(preferredReplica, newLeader, "Preferred replica election failed")
}
}
class PreferredReplicaLeaderElectionCommandTestAuthorizer extends AclAuthorizer {
override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
actions.asScala.map { action =>
if (action.operation != AclOperation.ALTER || action.resourcePattern.resourceType != ResourceType.CLUSTER)
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}
}

1
docs/upgrade.html

@ -32,6 +32,7 @@ @@ -32,6 +32,7 @@
Furthermore, deprecated constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>,
<code>AclBinding</code> and <code>AclBindingFilter</code>.</li>
<li>The deprecated <code>Admin.electedPreferredLeaders()</code> methods were removed. Please use <code>Admin.electLeaders</code> instead.</li>
<li>The deprecated <code>kafka-preferred-replica-election</code> command line tool was removed. Please use <code>kafka-leader-election</code> instead.</li>
<li>The deprecated <code>ConfigEntry</code> constructor was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12577">KAFKA-12577</a>).
Please use the remaining public constructor instead.</li>
</ul>

Loading…
Cancel
Save