diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 0dd2ec793d6..d45fea85882 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1716,7 +1716,7 @@ class KafkaController(val config: KafkaConfig, case Some(replicas) => val replicaSet = replicas.toSet - if (replicas.size != replicaSet.size) + if (replicas.isEmpty || replicas.size != replicaSet.size) false else if (replicas.exists(_ < 0)) false diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 6ff4f0e677e..eb15529f607 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -2018,12 +2018,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val tp1 = new TopicPartition(topic, 0) val tp2 = new TopicPartition(topic, 1) val tp3 = new TopicPartition(topic, 2) - createTopic(topic, numPartitions = 3) + val tp4 = new TopicPartition(topic, 3) + createTopic(topic, numPartitions = 4) val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava) val nonExistentTp1 = new TopicPartition("topicA", 0) - val nonExistentTp2 = new TopicPartition(topic, 3) + val nonExistentTp2 = new TopicPartition(topic, 4) val nonExistentPartitionsResult = client.alterPartitionReassignments(Map( tp1 -> java.util.Optional.of(validAssignment), tp2 -> java.util.Optional.of(validAssignment), @@ -2037,14 +2038,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava) val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava) val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava) + val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava) val invalidReplicaResult = client.alterPartitionReassignments(Map( tp1 -> java.util.Optional.of(extraNonExistentReplica), tp2 -> java.util.Optional.of(negativeIdReplica), - tp3 -> java.util.Optional.of(duplicateReplica) + tp3 -> java.util.Optional.of(duplicateReplica), + tp4 -> java.util.Optional.of(noReplicas) ).asJava).values() assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException]) assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException]) assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException]) + assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException]) } @Test