diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java new file mode 100644 index 00000000000..9fd8a73c809 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * Thrown if a reassignment cannot be cancelled because none is in progress. + */ +public class NoReassignmentInProgressException extends ApiException { + public NoReassignmentInProgressException(String message) { + super(message); + } + + public NoReassignmentInProgressException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 171a3cd412b..18d8fd62e97 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -44,6 +44,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -193,7 +197,11 @@ public enum ApiKeys { ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS, ElectLeadersResponseData.SCHEMAS), INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS, - IncrementalAlterConfigsResponseData.SCHEMAS); + IncrementalAlterConfigsResponseData.SCHEMAS), + ALTER_PARTITION_REASSIGNMENTS(45, "AlterPartitionReassignments", AlterPartitionReassignmentsRequestData.SCHEMAS, + AlterPartitionReassignmentsResponseData.SCHEMAS), + LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS, + ListPartitionReassignmentsResponseData.SCHEMAS); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 89bc0515d7d..9f11fc89adf 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -65,6 +65,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException; import org.apache.kafka.common.errors.ElectionNotNeededException; import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException; import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.NoReassignmentInProgressException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; @@ -309,7 +310,9 @@ public enum Errors { FencedInstanceIdException::new), ELIGIBLE_LEADERS_NOT_AVAILABLE(83, "Eligible topic partition leaders are not available", EligibleLeadersNotAvailableException::new), - ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new); + ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new), + NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.", + NoReassignmentInProgressException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index c8ff90d0fde..58bf1283a77 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -233,6 +233,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return new ElectLeadersRequest(struct, apiVersion); case INCREMENTAL_ALTER_CONFIGS: return new IncrementalAlterConfigsRequest(struct, apiVersion); + case ALTER_PARTITION_REASSIGNMENTS: + return new AlterPartitionReassignmentsRequest(struct, apiVersion); + case LIST_PARTITION_REASSIGNMENTS: + return new ListPartitionReassignmentsRequest(struct, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 9eddf66b6fb..2e433e8b525 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -160,6 +160,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new ElectLeadersResponse(struct, version); case INCREMENTAL_ALTER_CONFIGS: return new IncrementalAlterConfigsResponse(struct, version); + case ALTER_PARTITION_REASSIGNMENTS: + return new AlterPartitionReassignmentsResponse(struct, version); + case LIST_PARTITION_REASSIGNMENTS: + return new ListPartitionReassignmentsResponse(struct, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java new file mode 100644 index 00000000000..7b2f848f614 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class AlterPartitionReassignmentsRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final AlterPartitionReassignmentsRequestData data; + + public Builder(AlterPartitionReassignmentsRequestData data) { + super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS); + this.data = data; + } + + @Override + public AlterPartitionReassignmentsRequest build(short version) { + return new AlterPartitionReassignmentsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final AlterPartitionReassignmentsRequestData data; + private final short version; + + private AlterPartitionReassignmentsRequest(AlterPartitionReassignmentsRequestData data, short version) { + super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, version); + this.data = data; + this.version = version; + } + + AlterPartitionReassignmentsRequest(Struct struct, short version) { + super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, version); + this.data = new AlterPartitionReassignmentsRequestData(struct, version); + this.version = version; + } + + public static AlterPartitionReassignmentsRequest parse(ByteBuffer buffer, short version) { + return new AlterPartitionReassignmentsRequest( + ApiKeys.ALTER_PARTITION_REASSIGNMENTS.parseRequest(version, buffer), + version + ); + } + + public AlterPartitionReassignmentsRequestData data() { + return data; + } + + /** + * Visible for testing. + */ + @Override + public Struct toStruct() { + return data.toStruct(version); + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + List topicResponses = new ArrayList<>(); + + for (ReassignableTopic topic : data.topics()) { + List partitionResponses = topic.partitions().stream().map(partition -> + new ReassignablePartitionResponse() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) + ).collect(Collectors.toList()); + topicResponses.add( + new ReassignableTopicResponse() + .setName(topic.name()) + .setPartitions(partitionResponses) + ); + } + + AlterPartitionReassignmentsResponseData responseData = new AlterPartitionReassignmentsResponseData() + .setResponses(topicResponses) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) + .setThrottleTimeMs(throttleTimeMs); + return new AlterPartitionReassignmentsResponse(responseData); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java new file mode 100644 index 00000000000..db1cfabebfa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class AlterPartitionReassignmentsResponse extends AbstractResponse { + + private final AlterPartitionReassignmentsResponseData data; + + public AlterPartitionReassignmentsResponse(Struct struct) { + this(struct, ApiKeys.ALTER_PARTITION_REASSIGNMENTS.latestVersion()); + } + + AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) { + this.data = data; + } + + AlterPartitionReassignmentsResponse(Struct struct, short version) { + this.data = new AlterPartitionReassignmentsResponseData(struct, version); + } + + public static AlterPartitionReassignmentsResponse parse(ByteBuffer buffer, short version) { + return new AlterPartitionReassignmentsResponse(ApiKeys.ALTER_PARTITION_REASSIGNMENTS.responseSchema(version).read(buffer), version); + } + + public AlterPartitionReassignmentsResponseData data() { + return data; + } + + @Override + public boolean shouldClientThrottle(short version) { + return true; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public Map errorCounts() { + Map counts = new HashMap<>(); + Errors topLevelErr = Errors.forCode(data.errorCode()); + counts.put(topLevelErr, counts.getOrDefault(topLevelErr, 0) + 1); + + for (ReassignableTopicResponse topicResponse : data.responses()) { + for (ReassignablePartitionResponse partitionResponse : topicResponse.partitions()) { + Errors error = Errors.forCode(partitionResponse.errorCode()); + counts.put(error, counts.getOrDefault(error, 0) + 1); + } + } + return counts; + } + + @Override + protected Struct toStruct(short version) { + return data.toStruct(version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 230c8073440..02c14e3274a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Collections; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; @@ -49,6 +50,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { private static final Field.Array ISR = new Field.Array("isr", INT32, "The in sync replica ids."); private static final Field.Int32 ZK_VERSION = new Field.Int32("zk_version", "The ZK version."); private static final Field.Array REPLICAS = new Field.Array("replicas", INT32, "The replica ids."); + private static final Field.Array ADDING_REPLICAS = new Field.Array("adding_replicas", INT32, + "The replica ids we are in the process of adding to the replica set during a reassignment."); + private static final Field.Array REMOVING_REPLICAS = new Field.Array("removing_replicas", INT32, + "The replica ids we are in the process of removing from the replica set during a reassignment."); private static final Field.Bool IS_NEW = new Field.Bool("is_new", "Whether the replica should have existed on the broker or not"); // live_leaders fields @@ -89,11 +94,28 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { REPLICAS, IS_NEW); + private static final Field PARTITION_STATES_V3 = PARTITION_STATES.withFields( + PARTITION_ID, + CONTROLLER_EPOCH, + LEADER, + LEADER_EPOCH, + ISR, + ZK_VERSION, + REPLICAS, + ADDING_REPLICAS, + REMOVING_REPLICAS, + IS_NEW); + // TOPIC_STATES_V2 normalizes TOPIC_STATES_V1 to make it more memory efficient private static final Field TOPIC_STATES_V2 = TOPIC_STATES.withFields( TOPIC_NAME, PARTITION_STATES_V2); + // TOPIC_STATES_V3 adds two new fields - adding_replicas and removing_replicas + private static final Field TOPIC_STATES_V3 = TOPIC_STATES.withFields( + TOPIC_NAME, + PARTITION_STATES_V3); + private static final Field LIVE_LEADERS_V0 = LIVE_LEADERS.withFields( END_POINT_ID, HOST, @@ -122,8 +144,17 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { TOPIC_STATES_V2, LIVE_LEADERS_V0); + // LEADER_AND_ISR_REQUEST_V3 added two new fields - adding_replicas and removing_replicas. + // These fields respectively specify the replica IDs we want to add or remove as part of a reassignment + private static final Schema LEADER_AND_ISR_REQUEST_V3 = new Schema( + CONTROLLER_ID, + CONTROLLER_EPOCH, + BROKER_EPOCH, + TOPIC_STATES_V3, + LIVE_LEADERS_V0); + public static Schema[] schemaVersions() { - return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1, LEADER_AND_ISR_REQUEST_V2}; + return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1, LEADER_AND_ISR_REQUEST_V2, LEADER_AND_ISR_REQUEST_V3}; } public static class Builder extends AbstractControlRequest.Builder { @@ -223,7 +254,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { for (Map.Entry partitionEntry : partitionMap.entrySet()) { Struct partitionStateData = topicStateData.instance(PARTITION_STATES); partitionStateData.set(PARTITION_ID, partitionEntry.getKey()); - partitionEntry.getValue().setStruct(partitionStateData); + partitionEntry.getValue().setStruct(partitionStateData, version); partitionStatesData.add(partitionStateData); } topicStateData.set(PARTITION_STATES, partitionStatesData.toArray()); @@ -237,7 +268,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { TopicPartition topicPartition = entry.getKey(); partitionStateData.set(TOPIC_NAME, topicPartition.topic()); partitionStateData.set(PARTITION_ID, topicPartition.partition()); - entry.getValue().setStruct(partitionStateData); + entry.getValue().setStruct(partitionStateData, version); partitionStatesData.add(partitionStateData); } struct.set(PARTITION_STATES, partitionStatesData.toArray()); @@ -269,6 +300,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { case 0: case 1: case 2: + case 3: return new LeaderAndIsrResponse(error, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -298,6 +330,8 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { public static final class PartitionState { public final BasePartitionState basePartitionState; + public final List addingReplicas; + public final List removingReplicas; public final boolean isNew; public PartitionState(int controllerEpoch, @@ -307,7 +341,29 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { int zkVersion, List replicas, boolean isNew) { + this(controllerEpoch, + leader, + leaderEpoch, + isr, + zkVersion, + replicas, + Collections.emptyList(), + Collections.emptyList(), + isNew); + } + + public PartitionState(int controllerEpoch, + int leader, + int leaderEpoch, + List isr, + int zkVersion, + List replicas, + List addingReplicas, + List removingReplicas, + boolean isNew) { this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); + this.addingReplicas = addingReplicas; + this.removingReplicas = removingReplicas; this.isNew = isNew; } @@ -329,6 +385,21 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { replicas.add((Integer) r); this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); + + List addingReplicas = new ArrayList<>(); + if (struct.hasField(ADDING_REPLICAS)) { + for (Object r : struct.get(ADDING_REPLICAS)) + addingReplicas.add((Integer) r); + } + this.addingReplicas = addingReplicas; + + List removingReplicas = new ArrayList<>(); + if (struct.hasField(REMOVING_REPLICAS)) { + for (Object r : struct.get(REMOVING_REPLICAS)) + removingReplicas.add((Integer) r); + } + this.removingReplicas = removingReplicas; + this.isNew = struct.getOrElse(IS_NEW, false); } @@ -340,18 +411,23 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { ", isr=" + Utils.join(basePartitionState.isr, ",") + ", zkVersion=" + basePartitionState.zkVersion + ", replicas=" + Utils.join(basePartitionState.replicas, ",") + + ", addingReplicas=" + Utils.join(addingReplicas, ",") + + ", removingReplicas=" + Utils.join(removingReplicas, ",") + ", isNew=" + isNew + ")"; } - private void setStruct(Struct struct) { + private void setStruct(Struct struct, short version) { struct.set(CONTROLLER_EPOCH, basePartitionState.controllerEpoch); struct.set(LEADER, basePartitionState.leader); struct.set(LEADER_EPOCH, basePartitionState.leaderEpoch); struct.set(ISR, basePartitionState.isr.toArray()); struct.set(ZK_VERSION, basePartitionState.zkVersion); struct.set(REPLICAS, basePartitionState.replicas.toArray()); + if (version >= 3) { + struct.set(ADDING_REPLICAS, addingReplicas.toArray()); + struct.set(REMOVING_REPLICAS, removingReplicas.toArray()); + } struct.setIfExists(IS_NEW, isNew); } } - } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index 3ab9bf79de4..3b80222eff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -50,8 +50,10 @@ public class LeaderAndIsrResponse extends AbstractResponse { private static final Schema LEADER_AND_ISR_RESPONSE_V2 = LEADER_AND_ISR_RESPONSE_V1; + private static final Schema LEADER_AND_ISR_RESPONSE_V3 = LEADER_AND_ISR_RESPONSE_V2; + public static Schema[] schemaVersions() { - return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1, LEADER_AND_ISR_RESPONSE_V2}; + return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1, LEADER_AND_ISR_RESPONSE_V2, LEADER_AND_ISR_RESPONSE_V3}; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java new file mode 100644 index 00000000000..471147bdb2a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + + +public class ListPartitionReassignmentsRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final ListPartitionReassignmentsRequestData data; + + public Builder(ListPartitionReassignmentsRequestData data) { + super(ApiKeys.LIST_PARTITION_REASSIGNMENTS); + this.data = data; + } + + @Override + public ListPartitionReassignmentsRequest build(short version) { + return new ListPartitionReassignmentsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private ListPartitionReassignmentsRequestData data; + private final short version; + + private ListPartitionReassignmentsRequest(ListPartitionReassignmentsRequestData data, short version) { + super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version); + this.data = data; + this.version = version; + } + + ListPartitionReassignmentsRequest(Struct struct, short version) { + super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version); + this.data = new ListPartitionReassignmentsRequestData(struct, version); + this.version = version; + } + + public static ListPartitionReassignmentsRequest parse(ByteBuffer buffer, short version) { + return new ListPartitionReassignmentsRequest( + ApiKeys.LIST_PARTITION_REASSIGNMENTS.parseRequest(version, buffer), version + ); + } + + public ListPartitionReassignmentsRequestData data() { + return data; + } + + /** + * Visible for testing. + */ + @Override + public Struct toStruct() { + return data.toStruct(version); + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + + List ongoingTopicReassignments = data.topics().stream().map(topic -> + new OngoingTopicReassignment() + .setName(topic.name()) + .setPartitions(topic.partitionIndexes().stream().map(partitionIndex -> + new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList()) + ) + ).collect(Collectors.toList()); + + ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() + .setTopics(ongoingTopicReassignments) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) + .setThrottleTimeMs(throttleTimeMs); + return new ListPartitionReassignmentsResponse(responseData); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java new file mode 100644 index 00000000000..9513e883030 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class ListPartitionReassignmentsResponse extends AbstractResponse { + + private final ListPartitionReassignmentsResponseData data; + + public ListPartitionReassignmentsResponse(Struct struct) { + this(struct, ApiKeys.LIST_PARTITION_REASSIGNMENTS.latestVersion()); + } + + ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) { + this.data = responseData; + } + + ListPartitionReassignmentsResponse(Struct struct, short version) { + this.data = new ListPartitionReassignmentsResponseData(struct, version); + } + + public static ListPartitionReassignmentsResponse parse(ByteBuffer buffer, short version) { + return new ListPartitionReassignmentsResponse(ApiKeys.LIST_PARTITION_REASSIGNMENTS.responseSchema(version).read(buffer), version); + } + + public ListPartitionReassignmentsResponseData data() { + return data; + } + + @Override + public boolean shouldClientThrottle(short version) { + return true; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public Map errorCounts() { + Map counts = new HashMap<>(); + Errors topLevelErr = Errors.forCode(data.errorCode()); + counts.put(topLevelErr, 1); + + return counts; + } + + @Override + protected Struct toStruct(short version) { + return data.toStruct(version); + } +} diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json new file mode 100644 index 00000000000..f962e1e65db --- /dev/null +++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 45, + "type": "request", + "name": "AlterPartitionReassignmentsRequest", + "validVersions": "0", + "fields": [ + { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", + "about": "The time in ms to wait for the request to complete." }, + { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+", + "about": "The topics to reassign.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+", + "about": "The partitions to reassign.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." } + ]} + ]} + ] +} diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json new file mode 100644 index 00000000000..d04959678f6 --- /dev/null +++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 45, + "type": "response", + "name": "AlterPartitionReassignmentsResponse", + "validVersions": "0", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The top-level error message, or null if there was no error." }, + { "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+", + "about": "The responses to topics to reassign.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name" }, + { "name": "Partitions", "type": "[]ReassignablePartitionResponse", "versions": "0+", + "about": "The responses to partitions to reassign", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code for this partition, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The error message for this partition, or null if there was no error." } + ]} + ]} + ] +} diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index a449b869ad8..c43d2f4b163 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -20,7 +20,9 @@ // Version 1 adds IsNew. // // Version 2 adds broker epoch and reorganizes the partitions by topic. - "validVersions": "0-2", + // + // Version 3 adds AddingReplicas and RemovingReplicas + "validVersions": "0-3", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The current controller ID." }, @@ -68,6 +70,10 @@ "about": "The ZooKeeper version." }, { "name": "Replicas", "type": "[]int32", "versions": "0+", "about": "The replica IDs." }, + { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, + "about": "The replica IDs that we are adding this partition to, or null if no replicas are being added." }, + { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, + "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." }, { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, "about": "Whether the replica should have existed on the broker or not." } ]} diff --git a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json index 8f4bf635c79..06bb088e179 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json @@ -20,7 +20,9 @@ // Version 1 adds KAFKA_STORAGE_ERROR as a valid error code. // // Version 2 is the same as version 1. - "validVersions": "0-2", + // + // Version 3 is the same as version 2. + "validVersions": "0-3", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json new file mode 100644 index 00000000000..d0ebf8bd7d6 --- /dev/null +++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 46, + "type": "request", + "name": "ListPartitionReassignmentsRequest", + "validVersions": "0", + "fields": [ + { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", + "about": "The time in ms to wait for the request to complete." }, + { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+", + "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name" }, + { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", + "about": "The partitions to list partition reassignments for." } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json new file mode 100644 index 00000000000..b79e0523b32 --- /dev/null +++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 46, + "type": "response", + "name": "ListPartitionReassignmentsResponse", + "validVersions": "0", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The top-level error message, or null if there was no error." }, + { "name": "Topics", "type": "[]OngoingTopicReassignment", "versions": "0+", + "about": "The ongoing reassignments for each topic.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]OngoingPartitionReassignment", "versions": "0+", + "about": "The ongoing reassignments for each partition.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The index of the partition." }, + { "name": "Replicas", "type": "[]int32", "versions": "0+", + "about": "The current replica set." }, + { "name": "AddingReplicas", "type": "[]int32", "versions": "0+", + "about": "The set of replicas we are currently adding." }, + { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", + "about": "The set of replicas we are currently removing." } + ]} + ]} + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 58c650a011c..bdfce3f32f1 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -233,12 +233,53 @@ public final class MessageTest { } + @Test + public void testLeaderAndIsrVersions() throws Exception { + // Version 3 adds two new fields - AddingReplicas and RemovingReplicas + LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState partitionStateNoAddingRemovingReplicas = + new LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState() + .setName("topic") + .setPartitionStatesV0( + Collections.singletonList( + new LeaderAndIsrRequestData.LeaderAndIsrRequestPartition() + .setPartitionIndex(0) + .setReplicas(Collections.singletonList(0)) + ) + ); + LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState partitionStateWithAddingRemovingReplicas = + new LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState() + .setName("topic") + .setPartitionStatesV0( + Collections.singletonList( + new LeaderAndIsrRequestData.LeaderAndIsrRequestPartition() + .setPartitionIndex(0) + .setReplicas(Collections.singletonList(0)) + .setAddingReplicas(Collections.singletonList(1)) + .setRemovingReplicas(Collections.singletonList(1)) + ) + ); + testAllMessageRoundTripsBetweenVersions( + (short) 2, + (short) 3, + new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)), + new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateNoAddingRemovingReplicas))); + testAllMessageRoundTripsFromVersion((short) 3, new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas))); + } + private void testAllMessageRoundTrips(Message message) throws Exception { testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message); } private void testAllMessageRoundTripsBeforeVersion(short beforeVersion, Message message, Message expected) throws Exception { - for (short version = 0; version < beforeVersion; version++) { + testAllMessageRoundTripsBetweenVersions((short) 0, beforeVersion, message, expected); + } + + /** + * @param startVersion - the version we want to start at, inclusive + * @param endVersion - the version we want to end at, exclusive + */ + private void testAllMessageRoundTripsBetweenVersions(short startVersion, short endVersion, Message message, Message expected) throws Exception { + for (short version = startVersion; version < endVersion; version++) { testMessageRoundTrip(version, message, expected); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 98552108a1e..70adcca9f8d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -66,6 +66,10 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -362,6 +366,12 @@ public class RequestResponseTest { checkRequest(createIncrementalAlterConfigsRequest(), true); checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true); checkResponse(createIncrementalAlterConfigsResponse(), 0, true); + checkRequest(createAlterPartitionReassignmentsRequest(), true); + checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true); + checkResponse(createAlterPartitionReassignmentsResponse(), 0, true); + checkRequest(createListPartitionReassignmentsRequest(), true); + checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true); + checkResponse(createListPartitionReassignmentsResponse(), 0, true); } @Test @@ -1630,4 +1640,61 @@ public class RequestResponseTest { .setErrorMessage("Duplicate Keys")); return new IncrementalAlterConfigsResponse(data); } + + private AlterPartitionReassignmentsRequest createAlterPartitionReassignmentsRequest() { + AlterPartitionReassignmentsRequestData data = new AlterPartitionReassignmentsRequestData(); + data.topics().add( + new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("topic").setPartitions( + Collections.singletonList( + new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null) + ) + ) + ); + return new AlterPartitionReassignmentsRequest.Builder(data).build((short) 0); + } + + private AlterPartitionReassignmentsResponse createAlterPartitionReassignmentsResponse() { + AlterPartitionReassignmentsResponseData data = new AlterPartitionReassignmentsResponseData(); + data.responses().add( + new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse() + .setName("topic") + .setPartitions(Collections.singletonList( + new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse() + .setPartitionIndex(0) + .setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()) + .setErrorMessage("No reassignment is in progress for topic topic partition 0") + ) + ) + ); + return new AlterPartitionReassignmentsResponse(data); + } + + private ListPartitionReassignmentsRequest createListPartitionReassignmentsRequest() { + ListPartitionReassignmentsRequestData data = new ListPartitionReassignmentsRequestData(); + data.setTopics( + Collections.singletonList( + new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics() + .setName("topic") + .setPartitionIndexes(Collections.singletonList(1)) + ) + ); + return new ListPartitionReassignmentsRequest.Builder(data).build((short) 0); + } + + private ListPartitionReassignmentsResponse createListPartitionReassignmentsResponse() { + ListPartitionReassignmentsResponseData data = new ListPartitionReassignmentsResponseData(); + data.topics().add( + new ListPartitionReassignmentsResponseData.OngoingTopicReassignment() + .setName("topic") + .setPartitions(Collections.singletonList( + new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment() + .setPartitionIndex(0) + .setReplicas(Arrays.asList(1, 2)) + .setAddingReplicas(Collections.singletonList(2)) + .setRemovingReplicas(Collections.singletonList(1)) + ) + ) + ); + return new ListPartitionReassignmentsResponse(data); + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2a0fbd35d04..3ec6b2314d1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -52,6 +52,8 @@ import org.apache.kafka.common.message.CreateTopicsResponseData import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData import org.apache.kafka.common.message.DeleteTopicsResponseData import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} import org.apache.kafka.common.message.DescribeGroupsResponseData @@ -178,6 +180,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request) case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request) + case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request) + case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request) } } catch { case e: FatalExitError => throw e @@ -2299,6 +2303,32 @@ class KafkaApis(val requestChannel: RequestChannel, new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava)) } + def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = { + authorizeClusterAlter(request) + val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest] + + sendResponseMaybeThrottle(request, requestThrottleMs => + new AlterPartitionReassignmentsResponse( + new AlterPartitionReassignmentsResponseData().setThrottleTimeMs(requestThrottleMs) + .setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setErrorMessage(Errors.UNSUPPORTED_VERSION.message()) + .toStruct(0) + ) + ) + } + + def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = { + authorizeClusterDescribe(request) + val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest] + + sendResponseMaybeThrottle(request, requestThrottleMs => + new ListPartitionReassignmentsResponse( + new ListPartitionReassignmentsResponseData().setThrottleTimeMs(requestThrottleMs) + .setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setErrorMessage(Errors.UNSUPPORTED_VERSION.message()) + .toStruct(0) + ) + ) + } + private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = { val error = resource.`type` match { case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index cb5f1aadb04..ef2e6fd5b13 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -44,6 +44,8 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData import org.apache.kafka.common.message.HeartbeatRequestData import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection} +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData +import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData import org.apache.kafka.common.message.JoinGroupRequestData import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity @@ -164,7 +166,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse], ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse], ApiKeys.ELECT_LEADERS -> classOf[ElectLeadersResponse], - ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse] + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse], + ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> classOf[AlterPartitionReassignmentsResponse], + ApiKeys.LIST_PARTITION_REASSIGNMENTS -> classOf[ListPartitionReassignmentsResponse] ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( @@ -212,7 +216,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error), ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())), ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => - IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error) + IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), + ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())), + ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -252,7 +258,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl, ApiKeys.CREATE_PARTITIONS -> topicAlterAcl, ApiKeys.ELECT_LEADERS -> clusterAlterAcl, - ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl, + ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> clusterAlterAcl, + ApiKeys.LIST_PARTITION_REASSIGNMENTS -> clusterDescribeAcl ) @Before @@ -485,6 +493,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest { 10000 ).build() + private def alterPartitionReassignmentsRequest = new AlterPartitionReassignmentsRequest.Builder( + new AlterPartitionReassignmentsRequestData().setTopics( + List(new AlterPartitionReassignmentsRequestData.ReassignableTopic() + .setName(topic) + .setPartitions( + List(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(tp.partition())).asJava + )).asJava + ) + ).build() + + private def listPartitionReassignmentsRequest = new ListPartitionReassignmentsRequest.Builder( + new ListPartitionReassignmentsRequestData().setTopics( + List(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics() + .setName(topic) + .setPartitionIndexes( + List(new Integer(tp.partition)).asJava + )).asJava + ) + ).build() + @Test def testAuthorizationWithTopicExisting() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( @@ -520,7 +548,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // Check StopReplica last since some APIs depend on replica availability ApiKeys.STOP_REPLICA -> stopReplicaRequest, ApiKeys.ELECT_LEADERS -> electLeadersRequest, - ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest, + ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> alterPartitionReassignmentsRequest, + ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest ) for ((key, request) <- requestKeyToRequest) { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 242ab219686..b09fc02846f 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -41,6 +41,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListGroupsRequestData +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData +import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData import org.apache.kafka.common.message.OffsetCommitRequestData import org.apache.kafka.common.message.SaslAuthenticateRequestData import org.apache.kafka.common.message.SaslHandshakeRequestData @@ -465,6 +467,16 @@ class RequestQuotaTest extends BaseRequestTest { new IncrementalAlterConfigsRequest.Builder( new IncrementalAlterConfigsRequestData()) + case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => + new AlterPartitionReassignmentsRequest.Builder( + new AlterPartitionReassignmentsRequestData() + ) + + case ApiKeys.LIST_PARTITION_REASSIGNMENTS => + new ListPartitionReassignmentsRequest.Builder( + new ListPartitionReassignmentsRequestData() + ) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -568,6 +580,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.ELECT_LEADERS => new ElectLeadersResponse(response).throttleTimeMs case ApiKeys.INCREMENTAL_ALTER_CONFIGS => new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs + case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => new AlterPartitionReassignmentsResponse(response).throttleTimeMs + case ApiKeys.LIST_PARTITION_REASSIGNMENTS => new ListPartitionReassignmentsResponse(response).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } }