Browse Source

MINOR: Check against empty replicas in AlterPartitionReassignments (#7574)

Do not allow an empty replica set to be passed into the reassignment API.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@gmail.com>
pull/7273/head
Stanislav Kozlovski 5 years ago committed by Colin Patrick McCabe
parent
commit
2efee34b74
  1. 2
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 10
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

2
core/src/main/scala/kafka/controller/KafkaController.scala

@ -1716,7 +1716,7 @@ class KafkaController(val config: KafkaConfig, @@ -1716,7 +1716,7 @@ class KafkaController(val config: KafkaConfig,
case Some(replicas) =>
val replicaSet = replicas.toSet
if (replicas.size != replicaSet.size)
if (replicas.isEmpty || replicas.size != replicaSet.size)
false
else if (replicas.exists(_ < 0))
false

10
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -2018,12 +2018,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -2018,12 +2018,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val tp3 = new TopicPartition(topic, 2)
createTopic(topic, numPartitions = 3)
val tp4 = new TopicPartition(topic, 3)
createTopic(topic, numPartitions = 4)
val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava)
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 3)
val nonExistentTp2 = new TopicPartition(topic, 4)
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
tp1 -> java.util.Optional.of(validAssignment),
tp2 -> java.util.Optional.of(validAssignment),
@ -2037,14 +2038,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -2037,14 +2038,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
val invalidReplicaResult = client.alterPartitionReassignments(Map(
tp1 -> java.util.Optional.of(extraNonExistentReplica),
tp2 -> java.util.Optional.of(negativeIdReplica),
tp3 -> java.util.Optional.of(duplicateReplica)
tp3 -> java.util.Optional.of(duplicateReplica),
tp4 -> java.util.Optional.of(noReplicas)
).asJava).values()
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException])
}
@Test

Loading…
Cancel
Save