Browse Source

KAFKA-9329; KafkaController::replicasAreValid should return error message (#7865)

The `KafkaController::replicasAreValid` method currently returns a
boolean indicating if replicas are valid or not. But the failure
condition loses any context on why replicas are not valid. This change
updates the metod to return the error conition if validation fails. This
allows caller to report the error to the client.

The change also renames the `replicasAreValid` method to
`validateReplicas` to reflect updated semantics.

Reviewers: Sean Li <seanli-rallyhealth@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
pull/7964/head
Vikas Singh 5 years ago committed by Jason Gustafson
parent
commit
9b312c401c
  1. 53
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 1
      core/src/main/scala/kafka/utils/json/DecodeJson.scala
  3. 140
      core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
  4. 5
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
  5. 16
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

53
core/src/main/scala/kafka/controller/KafkaController.scala

@ -1569,13 +1569,15 @@ class KafkaController(val config: KafkaConfig, @@ -1569,13 +1569,15 @@ class KafkaController(val config: KafkaConfig,
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
reassignments.foreach { case (tp, targetReplicas) =>
if (replicasAreValid(tp, targetReplicas)) {
maybeBuildReassignment(tp, targetReplicas) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
} else {
reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT))
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
maybeApiError match {
case None =>
maybeBuildReassignment(tp, targetReplicas) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
case Some(err) =>
reassignmentResults.put(tp, err)
}
}
@ -1588,22 +1590,27 @@ class KafkaController(val config: KafkaConfig, @@ -1588,22 +1590,27 @@ class KafkaController(val config: KafkaConfig,
}
}
private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = {
replicasOpt match {
case Some(replicas) =>
val replicaSet = replicas.toSet
if (replicas.isEmpty || replicas.size != replicaSet.size)
false
else if (replicas.exists(_ < 0))
false
else {
// Ensure that any new replicas are among the live brokers
val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
val newAssignment = currentAssignment.reassignTo(replicas)
newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
}
case None => true
private def validateReplicas(topicPartition: TopicPartition, replicas: Seq[Int]): Option[ApiError] = {
val replicaSet = replicas.toSet
if (replicas.isEmpty)
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Empty replica list specified in partition reassignment."))
else if (replicas.size != replicaSet.size) {
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Duplicate replica ids in partition reassignment replica list: $replicas"))
} else if (replicas.exists(_ < 0))
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Invalid broker id in replica list: $replicas"))
else {
// Ensure that any new replicas are among the live brokers
val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
val newAssignment = currentAssignment.reassignTo(replicas)
val areNewReplicasAlive = newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
if (!areNewReplicasAlive)
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Replica assignment has brokers that are not alive. Replica list: " +
s"${newAssignment.addingReplicas}, live broker list: ${controllerContext.liveBrokerIds}"))
else None
}
}

1
core/src/main/scala/kafka/utils/json/DecodeJson.scala

@ -19,7 +19,6 @@ package kafka.utils.json @@ -19,7 +19,6 @@ package kafka.utils.json
import scala.collection.{Map, Seq}
import scala.collection.compat._
import scala.language.higherKinds
import scala.collection.JavaConverters._
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}

140
core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala

@ -12,18 +12,64 @@ @@ -12,18 +12,64 @@
*/
package kafka.admin
import java.util.Optional
import kafka.admin.TopicCommand.ZookeeperTopicService
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.junit.Test
import org.apache.kafka.clients.admin.{AdminClientConfig, NewPartitionReassignment, NewTopic, AdminClient => JAdminClient}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.Seq
class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest {
import ReassignPartitionsIntegrationTest._
var servers: Seq[KafkaServer] = Seq()
val broker1 = 0
val broker2 = 1
val broker3 = 2
val broker4 = 3
val broker5 = 4
val broker6 = 5
val rack = Map(
broker1 -> "rack1",
broker2 -> "rack2",
broker3 -> "rack2",
broker4 -> "rack1",
broker5 -> "rack3",
broker6 -> "rack3"
)
@Before
override def setUp(): Unit = {
super.setUp()
val brokerConfigs = TestUtils.createBrokerConfigs(6, zkConnect, enableControlledShutdown = true)
servers = brokerConfigs.map { config =>
config.setProperty(KafkaConfig.RackProp, rack(config.getProperty(KafkaConfig.BrokerIdProp).toInt))
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
config.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
config.setProperty(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
TestUtils.createServer(KafkaConfig.fromProps(config))
}
}
@After
override def tearDown(): Unit = {
TestUtils.shutdownServers(servers)
super.tearDown()
}
@Test
def testRackAwareReassign(): Unit = {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)
val numPartitions = 18
val replicationFactor = 3
@ -37,11 +83,93 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw @@ -37,11 +83,93 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw
val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkClient,
rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
rack.keys.toSeq.sorted, topicJson, disableRackAware = false)
val assignment = proposedAssignment map { case (topicPartition, replicas) =>
(topicPartition.partition, replicas)
}
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
checkReplicaDistribution(assignment, rack, rack.size, numPartitions, replicationFactor)
}
@Test
def testReassignPartition(): Unit = {
TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
val topic = "test-topic"
val partition = 0: Integer
val partitionAssignment = Map(partition -> Seq(broker1: Integer, broker2:Integer).asJava).asJava
val newTopic = new NewTopic(topic, partitionAssignment)
client.createTopics(Seq(newTopic).asJava).all().get()
val topicPartition = new TopicPartition(topic, partition)
// All sync replicas are in the ISR
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker1, broker2))
// Reassign replicas to different brokers
client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(broker3, broker4))).asJava
).all().get()
waitForAllReassignmentsToComplete(client)
// Metadata info is eventually consistent wait for update
TestUtils.waitForReplicasAssigned(client, topicPartition, Seq(broker3, broker4))
// All sync replicas are in the ISR
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker3, broker4))
}
}
@Test
def testInvalidReplicaIds(): Unit = {
TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
val topic = "test-topic"
val partition = 0: Integer
val partitionAssignment = Map(partition -> Seq(broker1: Integer, broker2: Integer).asJava).asJava
val newTopic = new NewTopic(topic, partitionAssignment)
client.createTopics(Seq(newTopic).asJava).all().get()
val topicPartition = new TopicPartition(topic, partition)
// All sync replicas are in the ISR
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker1, broker2))
// Test reassignment with duplicate broker ids
var future = client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(broker4, broker5, broker5))).asJava
).all()
JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException])
// Test reassignment with invalid broker ids
future = client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(-1, broker3))).asJava
).all()
JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException])
// Test reassignment with extra broker ids
future = client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(6, broker2, broker3))).asJava
).all()
JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException])
}
}
}
object ReassignPartitionsIntegrationTest {
def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = {
Map(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT")),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000"
)
}
def reassignmentEntry(replicas: Seq[Int]): Optional[NewPartitionReassignment] = {
Optional.of(new NewPartitionReassignment(replicas.map(r => r: Integer).asJava))
}
def waitForAllReassignmentsToComplete(client: JAdminClient): Unit = {
TestUtils.waitUntilTrue(() => client.listPartitionReassignments().reassignments().get().isEmpty,
s"There still are ongoing reassignments", pause = 100L)
}
}

5
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala

@ -314,10 +314,7 @@ object LeaderElectionCommandTest { @@ -314,10 +314,7 @@ object LeaderElectionCommandTest {
}
def bootstrapServers(servers: Seq[KafkaServer]): String = {
servers.map { server =>
val port = server.socketServer.boundPort(ListenerName.normalised("PLAINTEXT"))
s"localhost:$port"
}.headOption.mkString(",")
TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
}
def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = {

16
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -1553,6 +1553,22 @@ object TestUtils extends Logging { @@ -1553,6 +1553,22 @@ object TestUtils extends Logging {
)
}
def waitForReplicasAssigned(client: Admin, partition: TopicPartition, brokerIds: Seq[Int]): Unit = {
TestUtils.waitUntilTrue(
() => {
val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala
val replicas = description
.values
.flatMap(_.partitions.asScala.flatMap(_.replicas.asScala))
.map(_.id)
.toSeq
brokerIds == replicas
},
s"Expected brokers $brokerIds to be the replicas for $partition"
)
}
/**
* Capture the console output during the execution of the provided function.
*/

Loading…
Cancel
Save