Browse Source

MINOR: Re-implement NewPartitionReassignment#of() (#7592)

Re-implement NewPartitionReassignment#of.  It now takes a list rather than a variable-length list of arguments.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Vikas Singh <vikas@confluent.io>
pull/7596/head
Stanislav Kozlovski 5 years ago committed by Colin Patrick McCabe
parent
commit
28ef7f1d6d
  1. 12
      clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
  2. 12
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  3. 31
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
  4. 2
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
  5. 4
      core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala

12
clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java

@ -21,6 +21,7 @@ import java.util.ArrayList; @@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}.
@ -28,7 +29,16 @@ import java.util.Map; @@ -28,7 +29,16 @@ import java.util.Map;
public class NewPartitionReassignment {
private final List<Integer> targetReplicas;
public NewPartitionReassignment(List<Integer> targetReplicas) {
/**
* @throws IllegalArgumentException if no replicas are supplied
*/
public static Optional<NewPartitionReassignment> of(List<Integer> replicas) {
if (replicas == null || replicas.size() == 0)
throw new IllegalArgumentException("Cannot create a new partition reassignment without any replicas");
return Optional.of(new NewPartitionReassignment(replicas));
}
private NewPartitionReassignment(List<Integer> targetReplicas) {
this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas));
}

12
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -2051,7 +2051,7 @@ public class KafkaAdminClientTest { @@ -2051,7 +2051,7 @@ public class KafkaAdminClientTest {
TopicPartition tp2 = new TopicPartition("B", 0);
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
reassignments.put(tp1, Optional.empty());
reassignments.put(tp2, newPartitionReassignment(Arrays.asList(1, 2, 3)));
reassignments.put(tp2, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
// 1. server returns less responses than number of partitions we sent
AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData();
@ -2142,9 +2142,9 @@ public class KafkaAdminClientTest { @@ -2142,9 +2142,9 @@ public class KafkaAdminClientTest {
TopicPartition invalidTopicTP = new TopicPartition("", 0);
TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments = new HashMap<>();
invalidTopicReassignments.put(invalidPartitionTP, newPartitionReassignment(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(invalidTopicTP, newPartitionReassignment(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(tp1, newPartitionReassignment(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
AlterPartitionReassignmentsResponseData singlePartResponseData =
new AlterPartitionReassignmentsResponseData()
@ -2771,8 +2771,4 @@ public class KafkaAdminClientTest { @@ -2771,8 +2771,4 @@ public class KafkaAdminClientTest {
}
}
}
private static Optional<NewPartitionReassignment> newPartitionReassignment(List<Integer> targetReplicas) {
return Optional.of(new NewPartitionReassignment(targetReplicas));
}
}

31
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -2018,37 +2018,36 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -2018,37 +2018,36 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val tp3 = new TopicPartition(topic, 2)
val tp4 = new TopicPartition(topic, 3)
createTopic(topic, numPartitions = 4)
val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava)
val validAssignment = NewPartitionReassignment.of(
(0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
)
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 4)
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
tp1 -> java.util.Optional.of(validAssignment),
tp2 -> java.util.Optional.of(validAssignment),
tp3 -> java.util.Optional.of(validAssignment),
nonExistentTp1 -> java.util.Optional.of(validAssignment),
nonExistentTp2 -> java.util.Optional.of(validAssignment)
tp1 -> validAssignment,
tp2 -> validAssignment,
tp3 -> validAssignment,
nonExistentTp1 -> validAssignment,
nonExistentTp2 -> validAssignment
).asJava).values()
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException])
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException])
val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
val extraNonExistentReplica = NewPartitionReassignment.of((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
val invalidReplicaResult = client.alterPartitionReassignments(Map(
tp1 -> java.util.Optional.of(extraNonExistentReplica),
tp2 -> java.util.Optional.of(negativeIdReplica),
tp3 -> java.util.Optional.of(duplicateReplica),
tp4 -> java.util.Optional.of(noReplicas)
tp1 -> extraNonExistentReplica,
tp2 -> negativeIdReplica,
tp3 -> duplicateReplica
).asJava).values()
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException])
}
@Test

2
core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

@ -1140,7 +1140,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @@ -1140,7 +1140,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment]) =
tp -> java.util.Optional.of(new NewPartitionReassignment(replicas.map(_.asInstanceOf[Integer]).asJava))
tp -> NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition, java.util.Optional[NewPartitionReassignment]) =
tp -> java.util.Optional.empty()

4
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package kafka.admin
import java.util.{Collections, Optional, Properties}
import java.util.{Collections, Properties}
import kafka.admin.TopicCommand.{AdminClientTopicService, TopicCommandOptions}
import kafka.common.AdminCommandFailedException
@ -674,7 +674,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin @@ -674,7 +674,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
val targetReplica = brokerIds.diff(replicasOfFirstPartition).head
adminClient.alterPartitionReassignments(Collections.singletonMap(firstTopicPartition,
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica)))))
NewPartitionReassignment.of(Collections.singletonList(targetReplica))))
// let's wait until the LAIR is propagated
TestUtils.waitUntilTrue(() => {

Loading…
Cancel
Save