From 2efee34b74d4895b504ab541b716edab72b320d1 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Tue, 22 Oct 2019 23:26:50 +0100 Subject: [PATCH] MINOR: Check against empty replicas in AlterPartitionReassignments (#7574) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Do not allow an empty replica set to be passed into the reassignment API. Reviewers: Colin P. McCabe , José Armando García Sancio --- .../main/scala/kafka/controller/KafkaController.scala | 2 +- .../kafka/api/AdminClientIntegrationTest.scala | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 0dd2ec793d6..d45fea85882 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1716,7 +1716,7 @@ class KafkaController(val config: KafkaConfig, case Some(replicas) => val replicaSet = replicas.toSet - if (replicas.size != replicaSet.size) + if (replicas.isEmpty || replicas.size != replicaSet.size) false else if (replicas.exists(_ < 0)) false diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 6ff4f0e677e..eb15529f607 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -2018,12 +2018,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val tp1 = new TopicPartition(topic, 0) val tp2 = new TopicPartition(topic, 1) val tp3 = new TopicPartition(topic, 2) - createTopic(topic, numPartitions = 3) + val tp4 = new TopicPartition(topic, 3) + createTopic(topic, numPartitions = 4) val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava) val nonExistentTp1 = new TopicPartition("topicA", 0) - val nonExistentTp2 = new TopicPartition(topic, 3) + val nonExistentTp2 = new TopicPartition(topic, 4) val nonExistentPartitionsResult = client.alterPartitionReassignments(Map( tp1 -> java.util.Optional.of(validAssignment), tp2 -> java.util.Optional.of(validAssignment), @@ -2037,14 +2038,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava) val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava) val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava) + val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava) val invalidReplicaResult = client.alterPartitionReassignments(Map( tp1 -> java.util.Optional.of(extraNonExistentReplica), tp2 -> java.util.Optional.of(negativeIdReplica), - tp3 -> java.util.Optional.of(duplicateReplica) + tp3 -> java.util.Optional.of(duplicateReplica), + tp4 -> java.util.Optional.of(noReplicas) ).asJava).values() assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException]) assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException]) assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException]) + assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException]) } @Test