Browse Source

KAFKA-5091; ReassignPartitionsCommand should protect against empty replica list assignment

ReassignPartitionsCommand should protect against empty replica list assignment.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2904 from amethystic/kafka-5901_ReassignPartitionsCommand_protect_against_empty_replica_list
pull/2923/merge
amethystic 8 years ago committed by Jason Gustafson
parent
commit
c96656efb3
  1. 3
      core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
  2. 11
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

3
core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala

@ -186,6 +186,9 @@ object ReassignPartitionsCommand extends Logging { @@ -186,6 +186,9 @@ object ReassignPartitionsCommand extends Logging {
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file is empty")
if (partitionsToBeReassigned.exists(_._2.isEmpty)) {
throw new AdminCommandFailedException("Partition replica list cannot be empty")
}
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
if (duplicateReassignedPartitions.nonEmpty)
throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(",")))

11
core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

@ -290,6 +290,17 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @@ -290,6 +290,17 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
}
@Test(expected = classOf[AdminCommandFailedException])
def shouldFailIfProposedHasEmptyReplicaList() {
//Given a single replica on server 100
startBrokers(Seq(100, 101))
createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
//When we execute an assignment that specifies an empty replica list (0: empty list in this case)
val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[]}]}"""
ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
}
@Test
def shouldPerformThrottledReassignmentOverVariousTopics() {
val throttle = Throttle(1000L)

Loading…
Cancel
Save