From 28ef7f1d6d7f4cec7e81b8d0641debebefec9104 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 24 Oct 2019 23:23:54 +0100 Subject: [PATCH] MINOR: Re-implement NewPartitionReassignment#of() (#7592) Re-implement NewPartitionReassignment#of. It now takes a list rather than a variable-length list of arguments. Reviewers: Colin P. McCabe , Vikas Singh --- .../admin/NewPartitionReassignment.java | 12 ++++++- .../clients/admin/KafkaAdminClientTest.java | 12 +++---- .../api/AdminClientIntegrationTest.scala | 31 +++++++++---------- .../admin/ReassignPartitionsClusterTest.scala | 2 +- .../TopicCommandWithAdminClientTest.scala | 4 +-- 5 files changed, 33 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java index 0f7a61ccf0a..111514a6563 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; /** * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}. @@ -28,7 +29,16 @@ import java.util.Map; public class NewPartitionReassignment { private final List targetReplicas; - public NewPartitionReassignment(List targetReplicas) { + /** + * @throws IllegalArgumentException if no replicas are supplied + */ + public static Optional of(List replicas) { + if (replicas == null || replicas.size() == 0) + throw new IllegalArgumentException("Cannot create a new partition reassignment without any replicas"); + return Optional.of(new NewPartitionReassignment(replicas)); + } + + private NewPartitionReassignment(List targetReplicas) { this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index da6c0d2efaf..ceea291429f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2051,7 +2051,7 @@ public class KafkaAdminClientTest { TopicPartition tp2 = new TopicPartition("B", 0); Map> reassignments = new HashMap<>(); reassignments.put(tp1, Optional.empty()); - reassignments.put(tp2, newPartitionReassignment(Arrays.asList(1, 2, 3))); + reassignments.put(tp2, NewPartitionReassignment.of(Arrays.asList(1, 2, 3))); // 1. server returns less responses than number of partitions we sent AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData(); @@ -2142,9 +2142,9 @@ public class KafkaAdminClientTest { TopicPartition invalidTopicTP = new TopicPartition("", 0); TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1); Map> invalidTopicReassignments = new HashMap<>(); - invalidTopicReassignments.put(invalidPartitionTP, newPartitionReassignment(Arrays.asList(1, 2, 3))); - invalidTopicReassignments.put(invalidTopicTP, newPartitionReassignment(Arrays.asList(1, 2, 3))); - invalidTopicReassignments.put(tp1, newPartitionReassignment(Arrays.asList(1, 2, 3))); + invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(Arrays.asList(1, 2, 3))); + invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(Arrays.asList(1, 2, 3))); + invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(Arrays.asList(1, 2, 3))); AlterPartitionReassignmentsResponseData singlePartResponseData = new AlterPartitionReassignmentsResponseData() @@ -2771,8 +2771,4 @@ public class KafkaAdminClientTest { } } } - - private static Optional newPartitionReassignment(List targetReplicas) { - return Optional.of(new NewPartitionReassignment(targetReplicas)); - } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index eb15529f607..bed69004fb1 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -2018,37 +2018,36 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val tp1 = new TopicPartition(topic, 0) val tp2 = new TopicPartition(topic, 1) val tp3 = new TopicPartition(topic, 2) - val tp4 = new TopicPartition(topic, 3) createTopic(topic, numPartitions = 4) - val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava) + + val validAssignment = NewPartitionReassignment.of( + (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava + ) val nonExistentTp1 = new TopicPartition("topicA", 0) val nonExistentTp2 = new TopicPartition(topic, 4) val nonExistentPartitionsResult = client.alterPartitionReassignments(Map( - tp1 -> java.util.Optional.of(validAssignment), - tp2 -> java.util.Optional.of(validAssignment), - tp3 -> java.util.Optional.of(validAssignment), - nonExistentTp1 -> java.util.Optional.of(validAssignment), - nonExistentTp2 -> java.util.Optional.of(validAssignment) + tp1 -> validAssignment, + tp2 -> validAssignment, + tp3 -> validAssignment, + nonExistentTp1 -> validAssignment, + nonExistentTp2 -> validAssignment ).asJava).values() assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException]) assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException]) - val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava) - val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava) - val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava) - val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava) + val extraNonExistentReplica = NewPartitionReassignment.of((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava) + val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava) + val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava) val invalidReplicaResult = client.alterPartitionReassignments(Map( - tp1 -> java.util.Optional.of(extraNonExistentReplica), - tp2 -> java.util.Optional.of(negativeIdReplica), - tp3 -> java.util.Optional.of(duplicateReplica), - tp4 -> java.util.Optional.of(noReplicas) + tp1 -> extraNonExistentReplica, + tp2 -> negativeIdReplica, + tp3 -> duplicateReplica ).asJava).values() assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException]) assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException]) assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException]) - assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException]) } @Test diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 0d6ae87515c..0e9a7d8f93b 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -1140,7 +1140,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { } def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment]) = - tp -> java.util.Optional.of(new NewPartitionReassignment(replicas.map(_.asInstanceOf[Integer]).asJava)) + tp -> NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava) def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition, java.util.Optional[NewPartitionReassignment]) = tp -> java.util.Optional.empty() diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 4c5742a089c..01dbae7292c 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -16,7 +16,7 @@ */ package kafka.admin -import java.util.{Collections, Optional, Properties} +import java.util.{Collections, Properties} import kafka.admin.TopicCommand.{AdminClientTopicService, TopicCommandOptions} import kafka.common.AdminCommandFailedException @@ -674,7 +674,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin val targetReplica = brokerIds.diff(replicasOfFirstPartition).head adminClient.alterPartitionReassignments(Collections.singletonMap(firstTopicPartition, - Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))) + NewPartitionReassignment.of(Collections.singletonList(targetReplica)))) // let's wait until the LAIR is propagated TestUtils.waitUntilTrue(() => {