Browse Source

KAFKA-8345: KIP-455 Protocol changes (part 1) (#7114)

Add a new exception, NoReassignmentInProgressException.  Modify LeaderAndIsrRequest to include the AddingRepicas and RemovingReplicas fields.  Add the ListPartitionReassignments and AlterPartitionReassignments RPCs.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Viktor Somogyi <viktorsomogyi@gmail.com>
pull/7133/head
Stanislav Kozlovski 5 years ago committed by Colin Patrick McCabe
parent
commit
81900d0ba0
  1. 31
      clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java
  2. 10
      clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
  3. 5
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  4. 4
      clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
  5. 4
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  6. 114
      clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
  7. 84
      clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
  8. 86
      clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
  9. 4
      clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
  10. 104
      clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
  11. 75
      clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
  12. 37
      clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
  13. 43
      clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
  14. 8
      clients/src/main/resources/common/message/LeaderAndIsrRequest.json
  15. 4
      clients/src/main/resources/common/message/LeaderAndIsrResponse.json
  16. 32
      clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
  17. 45
      clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json
  18. 43
      clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
  19. 67
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  20. 30
      core/src/main/scala/kafka/server/KafkaApis.scala
  21. 38
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  22. 14
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

31
clients/src/main/java/org/apache/kafka/common/errors/NoReassignmentInProgressException.java

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Thrown if a reassignment cannot be cancelled because none is in progress.
*/
public class NoReassignmentInProgressException extends ApiException {
public NoReassignmentInProgressException(String message) {
super(message);
}
public NoReassignmentInProgressException(String message, Throwable cause) {
super(message, cause);
}
}

10
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

@ -44,6 +44,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; @@ -44,6 +44,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
@ -193,7 +197,11 @@ public enum ApiKeys { @@ -193,7 +197,11 @@ public enum ApiKeys {
ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
ElectLeadersResponseData.SCHEMAS),
INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
IncrementalAlterConfigsResponseData.SCHEMAS);
IncrementalAlterConfigsResponseData.SCHEMAS),
ALTER_PARTITION_REASSIGNMENTS(45, "AlterPartitionReassignments", AlterPartitionReassignmentsRequestData.SCHEMAS,
AlterPartitionReassignmentsResponseData.SCHEMAS),
LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
ListPartitionReassignmentsResponseData.SCHEMAS);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;

5
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -65,6 +65,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException; @@ -65,6 +65,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.ElectionNotNeededException;
import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
@ -309,7 +310,9 @@ public enum Errors { @@ -309,7 +310,9 @@ public enum Errors {
FencedInstanceIdException::new),
ELIGIBLE_LEADERS_NOT_AVAILABLE(83, "Eligible topic partition leaders are not available",
EligibleLeadersNotAvailableException::new),
ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new);
ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new),
NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.",
NoReassignmentInProgressException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

4
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

@ -233,6 +233,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse { @@ -233,6 +233,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new ElectLeadersRequest(struct, apiVersion);
case INCREMENTAL_ALTER_CONFIGS:
return new IncrementalAlterConfigsRequest(struct, apiVersion);
case ALTER_PARTITION_REASSIGNMENTS:
return new AlterPartitionReassignmentsRequest(struct, apiVersion);
case LIST_PARTITION_REASSIGNMENTS:
return new ListPartitionReassignmentsRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

4
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -160,6 +160,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse { @@ -160,6 +160,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new ElectLeadersResponse(struct, version);
case INCREMENTAL_ALTER_CONFIGS:
return new IncrementalAlterConfigsResponse(struct, version);
case ALTER_PARTITION_REASSIGNMENTS:
return new AlterPartitionReassignmentsResponse(struct, version);
case LIST_PARTITION_REASSIGNMENTS:
return new ListPartitionReassignmentsResponse(struct, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

114
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java

@ -0,0 +1,114 @@ @@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class AlterPartitionReassignmentsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<AlterPartitionReassignmentsRequest> {
private final AlterPartitionReassignmentsRequestData data;
public Builder(AlterPartitionReassignmentsRequestData data) {
super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS);
this.data = data;
}
@Override
public AlterPartitionReassignmentsRequest build(short version) {
return new AlterPartitionReassignmentsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final AlterPartitionReassignmentsRequestData data;
private final short version;
private AlterPartitionReassignmentsRequest(AlterPartitionReassignmentsRequestData data, short version) {
super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, version);
this.data = data;
this.version = version;
}
AlterPartitionReassignmentsRequest(Struct struct, short version) {
super(ApiKeys.ALTER_PARTITION_REASSIGNMENTS, version);
this.data = new AlterPartitionReassignmentsRequestData(struct, version);
this.version = version;
}
public static AlterPartitionReassignmentsRequest parse(ByteBuffer buffer, short version) {
return new AlterPartitionReassignmentsRequest(
ApiKeys.ALTER_PARTITION_REASSIGNMENTS.parseRequest(version, buffer),
version
);
}
public AlterPartitionReassignmentsRequestData data() {
return data;
}
/**
* Visible for testing.
*/
@Override
public Struct toStruct() {
return data.toStruct(version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
List<ReassignableTopicResponse> topicResponses = new ArrayList<>();
for (ReassignableTopic topic : data.topics()) {
List<ReassignablePartitionResponse> partitionResponses = topic.partitions().stream().map(partition ->
new ReassignablePartitionResponse()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
).collect(Collectors.toList());
topicResponses.add(
new ReassignableTopicResponse()
.setName(topic.name())
.setPartitions(partitionResponses)
);
}
AlterPartitionReassignmentsResponseData responseData = new AlterPartitionReassignmentsResponseData()
.setResponses(topicResponses)
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
.setThrottleTimeMs(throttleTimeMs);
return new AlterPartitionReassignmentsResponse(responseData);
}
}

84
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java

@ -0,0 +1,84 @@ @@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class AlterPartitionReassignmentsResponse extends AbstractResponse {
private final AlterPartitionReassignmentsResponseData data;
public AlterPartitionReassignmentsResponse(Struct struct) {
this(struct, ApiKeys.ALTER_PARTITION_REASSIGNMENTS.latestVersion());
}
AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) {
this.data = data;
}
AlterPartitionReassignmentsResponse(Struct struct, short version) {
this.data = new AlterPartitionReassignmentsResponseData(struct, version);
}
public static AlterPartitionReassignmentsResponse parse(ByteBuffer buffer, short version) {
return new AlterPartitionReassignmentsResponse(ApiKeys.ALTER_PARTITION_REASSIGNMENTS.responseSchema(version).read(buffer), version);
}
public AlterPartitionReassignmentsResponseData data() {
return data;
}
@Override
public boolean shouldClientThrottle(short version) {
return true;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
Errors topLevelErr = Errors.forCode(data.errorCode());
counts.put(topLevelErr, counts.getOrDefault(topLevelErr, 0) + 1);
for (ReassignableTopicResponse topicResponse : data.responses()) {
for (ReassignablePartitionResponse partitionResponse : topicResponse.partitions()) {
Errors error = Errors.forCode(partitionResponse.errorCode());
counts.put(error, counts.getOrDefault(error, 0) + 1);
}
}
return counts;
}
@Override
protected Struct toStruct(short version) {
return data.toStruct(version);
}
}

86
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java

@ -33,6 +33,7 @@ import java.util.HashSet; @@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
@ -49,6 +50,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -49,6 +50,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
private static final Field.Array ISR = new Field.Array("isr", INT32, "The in sync replica ids.");
private static final Field.Int32 ZK_VERSION = new Field.Int32("zk_version", "The ZK version.");
private static final Field.Array REPLICAS = new Field.Array("replicas", INT32, "The replica ids.");
private static final Field.Array ADDING_REPLICAS = new Field.Array("adding_replicas", INT32,
"The replica ids we are in the process of adding to the replica set during a reassignment.");
private static final Field.Array REMOVING_REPLICAS = new Field.Array("removing_replicas", INT32,
"The replica ids we are in the process of removing from the replica set during a reassignment.");
private static final Field.Bool IS_NEW = new Field.Bool("is_new", "Whether the replica should have existed on the broker or not");
// live_leaders fields
@ -89,11 +94,28 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -89,11 +94,28 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
REPLICAS,
IS_NEW);
private static final Field PARTITION_STATES_V3 = PARTITION_STATES.withFields(
PARTITION_ID,
CONTROLLER_EPOCH,
LEADER,
LEADER_EPOCH,
ISR,
ZK_VERSION,
REPLICAS,
ADDING_REPLICAS,
REMOVING_REPLICAS,
IS_NEW);
// TOPIC_STATES_V2 normalizes TOPIC_STATES_V1 to make it more memory efficient
private static final Field TOPIC_STATES_V2 = TOPIC_STATES.withFields(
TOPIC_NAME,
PARTITION_STATES_V2);
// TOPIC_STATES_V3 adds two new fields - adding_replicas and removing_replicas
private static final Field TOPIC_STATES_V3 = TOPIC_STATES.withFields(
TOPIC_NAME,
PARTITION_STATES_V3);
private static final Field LIVE_LEADERS_V0 = LIVE_LEADERS.withFields(
END_POINT_ID,
HOST,
@ -122,8 +144,17 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -122,8 +144,17 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
TOPIC_STATES_V2,
LIVE_LEADERS_V0);
// LEADER_AND_ISR_REQUEST_V3 added two new fields - adding_replicas and removing_replicas.
// These fields respectively specify the replica IDs we want to add or remove as part of a reassignment
private static final Schema LEADER_AND_ISR_REQUEST_V3 = new Schema(
CONTROLLER_ID,
CONTROLLER_EPOCH,
BROKER_EPOCH,
TOPIC_STATES_V3,
LIVE_LEADERS_V0);
public static Schema[] schemaVersions() {
return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1, LEADER_AND_ISR_REQUEST_V2};
return new Schema[]{LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1, LEADER_AND_ISR_REQUEST_V2, LEADER_AND_ISR_REQUEST_V3};
}
public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {
@ -223,7 +254,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -223,7 +254,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
for (Map.Entry<Integer, PartitionState> partitionEntry : partitionMap.entrySet()) {
Struct partitionStateData = topicStateData.instance(PARTITION_STATES);
partitionStateData.set(PARTITION_ID, partitionEntry.getKey());
partitionEntry.getValue().setStruct(partitionStateData);
partitionEntry.getValue().setStruct(partitionStateData, version);
partitionStatesData.add(partitionStateData);
}
topicStateData.set(PARTITION_STATES, partitionStatesData.toArray());
@ -237,7 +268,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -237,7 +268,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
TopicPartition topicPartition = entry.getKey();
partitionStateData.set(TOPIC_NAME, topicPartition.topic());
partitionStateData.set(PARTITION_ID, topicPartition.partition());
entry.getValue().setStruct(partitionStateData);
entry.getValue().setStruct(partitionStateData, version);
partitionStatesData.add(partitionStateData);
}
struct.set(PARTITION_STATES, partitionStatesData.toArray());
@ -269,6 +300,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -269,6 +300,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
case 0:
case 1:
case 2:
case 3:
return new LeaderAndIsrResponse(error, responses);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@ -298,6 +330,8 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -298,6 +330,8 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
public static final class PartitionState {
public final BasePartitionState basePartitionState;
public final List<Integer> addingReplicas;
public final List<Integer> removingReplicas;
public final boolean isNew;
public PartitionState(int controllerEpoch,
@ -307,7 +341,29 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -307,7 +341,29 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
int zkVersion,
List<Integer> replicas,
boolean isNew) {
this(controllerEpoch,
leader,
leaderEpoch,
isr,
zkVersion,
replicas,
Collections.emptyList(),
Collections.emptyList(),
isNew);
}
public PartitionState(int controllerEpoch,
int leader,
int leaderEpoch,
List<Integer> isr,
int zkVersion,
List<Integer> replicas,
List<Integer> addingReplicas,
List<Integer> removingReplicas,
boolean isNew) {
this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
this.addingReplicas = addingReplicas;
this.removingReplicas = removingReplicas;
this.isNew = isNew;
}
@ -329,6 +385,21 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -329,6 +385,21 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
replicas.add((Integer) r);
this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
List<Integer> addingReplicas = new ArrayList<>();
if (struct.hasField(ADDING_REPLICAS)) {
for (Object r : struct.get(ADDING_REPLICAS))
addingReplicas.add((Integer) r);
}
this.addingReplicas = addingReplicas;
List<Integer> removingReplicas = new ArrayList<>();
if (struct.hasField(REMOVING_REPLICAS)) {
for (Object r : struct.get(REMOVING_REPLICAS))
removingReplicas.add((Integer) r);
}
this.removingReplicas = removingReplicas;
this.isNew = struct.getOrElse(IS_NEW, false);
}
@ -340,18 +411,23 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { @@ -340,18 +411,23 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
", isr=" + Utils.join(basePartitionState.isr, ",") +
", zkVersion=" + basePartitionState.zkVersion +
", replicas=" + Utils.join(basePartitionState.replicas, ",") +
", addingReplicas=" + Utils.join(addingReplicas, ",") +
", removingReplicas=" + Utils.join(removingReplicas, ",") +
", isNew=" + isNew + ")";
}
private void setStruct(Struct struct) {
private void setStruct(Struct struct, short version) {
struct.set(CONTROLLER_EPOCH, basePartitionState.controllerEpoch);
struct.set(LEADER, basePartitionState.leader);
struct.set(LEADER_EPOCH, basePartitionState.leaderEpoch);
struct.set(ISR, basePartitionState.isr.toArray());
struct.set(ZK_VERSION, basePartitionState.zkVersion);
struct.set(REPLICAS, basePartitionState.replicas.toArray());
if (version >= 3) {
struct.set(ADDING_REPLICAS, addingReplicas.toArray());
struct.set(REMOVING_REPLICAS, removingReplicas.toArray());
}
struct.setIfExists(IS_NEW, isNew);
}
}
}

4
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java

@ -50,8 +50,10 @@ public class LeaderAndIsrResponse extends AbstractResponse { @@ -50,8 +50,10 @@ public class LeaderAndIsrResponse extends AbstractResponse {
private static final Schema LEADER_AND_ISR_RESPONSE_V2 = LEADER_AND_ISR_RESPONSE_V1;
private static final Schema LEADER_AND_ISR_RESPONSE_V3 = LEADER_AND_ISR_RESPONSE_V2;
public static Schema[] schemaVersions() {
return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1, LEADER_AND_ISR_RESPONSE_V2};
return new Schema[]{LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1, LEADER_AND_ISR_RESPONSE_V2, LEADER_AND_ISR_RESPONSE_V3};
}
/**

104
clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java

@ -0,0 +1,104 @@ @@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
public class ListPartitionReassignmentsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ListPartitionReassignmentsRequest> {
private final ListPartitionReassignmentsRequestData data;
public Builder(ListPartitionReassignmentsRequestData data) {
super(ApiKeys.LIST_PARTITION_REASSIGNMENTS);
this.data = data;
}
@Override
public ListPartitionReassignmentsRequest build(short version) {
return new ListPartitionReassignmentsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private ListPartitionReassignmentsRequestData data;
private final short version;
private ListPartitionReassignmentsRequest(ListPartitionReassignmentsRequestData data, short version) {
super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version);
this.data = data;
this.version = version;
}
ListPartitionReassignmentsRequest(Struct struct, short version) {
super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version);
this.data = new ListPartitionReassignmentsRequestData(struct, version);
this.version = version;
}
public static ListPartitionReassignmentsRequest parse(ByteBuffer buffer, short version) {
return new ListPartitionReassignmentsRequest(
ApiKeys.LIST_PARTITION_REASSIGNMENTS.parseRequest(version, buffer), version
);
}
public ListPartitionReassignmentsRequestData data() {
return data;
}
/**
* Visible for testing.
*/
@Override
public Struct toStruct() {
return data.toStruct(version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
List<OngoingTopicReassignment> ongoingTopicReassignments = data.topics().stream().map(topic ->
new OngoingTopicReassignment()
.setName(topic.name())
.setPartitions(topic.partitionIndexes().stream().map(partitionIndex ->
new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList())
)
).collect(Collectors.toList());
ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData()
.setTopics(ongoingTopicReassignments)
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
.setThrottleTimeMs(throttleTimeMs);
return new ListPartitionReassignmentsResponse(responseData);
}
}

75
clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ListPartitionReassignmentsResponse extends AbstractResponse {
private final ListPartitionReassignmentsResponseData data;
public ListPartitionReassignmentsResponse(Struct struct) {
this(struct, ApiKeys.LIST_PARTITION_REASSIGNMENTS.latestVersion());
}
ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) {
this.data = responseData;
}
ListPartitionReassignmentsResponse(Struct struct, short version) {
this.data = new ListPartitionReassignmentsResponseData(struct, version);
}
public static ListPartitionReassignmentsResponse parse(ByteBuffer buffer, short version) {
return new ListPartitionReassignmentsResponse(ApiKeys.LIST_PARTITION_REASSIGNMENTS.responseSchema(version).read(buffer), version);
}
public ListPartitionReassignmentsResponseData data() {
return data;
}
@Override
public boolean shouldClientThrottle(short version) {
return true;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
Errors topLevelErr = Errors.forCode(data.errorCode());
counts.put(topLevelErr, 1);
return counts;
}
@Override
protected Struct toStruct(short version) {
return data.toStruct(version);
}
}

37
clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 45,
"type": "request",
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
"fields": [
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the request to complete." },
{ "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
"about": "The topics to reassign.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
"about": "The partitions to reassign.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." }
]}
]}
]
}

43
clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 45,
"type": "response",
"name": "AlterPartitionReassignmentsResponse",
"validVersions": "0",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+",
"about": "The responses to topics to reassign.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name" },
{ "name": "Partitions", "type": "[]ReassignablePartitionResponse", "versions": "0+",
"about": "The responses to partitions to reassign", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code for this partition, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The error message for this partition, or null if there was no error." }
]}
]}
]
}

8
clients/src/main/resources/common/message/LeaderAndIsrRequest.json

@ -20,7 +20,9 @@ @@ -20,7 +20,9 @@
// Version 1 adds IsNew.
//
// Version 2 adds broker epoch and reorganizes the partitions by topic.
"validVersions": "0-2",
//
// Version 3 adds AddingReplicas and RemovingReplicas
"validVersions": "0-3",
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The current controller ID." },
@ -68,6 +70,10 @@ @@ -68,6 +70,10 @@
"about": "The ZooKeeper version." },
{ "name": "Replicas", "type": "[]int32", "versions": "0+",
"about": "The replica IDs." },
{ "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true,
"about": "The replica IDs that we are adding this partition to, or null if no replicas are being added." },
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true,
"about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
{ "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
"about": "Whether the replica should have existed on the broker or not." }
]}

4
clients/src/main/resources/common/message/LeaderAndIsrResponse.json

@ -20,7 +20,9 @@ @@ -20,7 +20,9 @@
// Version 1 adds KAFKA_STORAGE_ERROR as a valid error code.
//
// Version 2 is the same as version 1.
"validVersions": "0-2",
//
// Version 3 is the same as version 2.
"validVersions": "0-3",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },

32
clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 46,
"type": "request",
"name": "ListPartitionReassignmentsRequest",
"validVersions": "0",
"fields": [
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the request to complete." },
{ "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
"about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name" },
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
"about": "The partitions to list partition reassignments for." }
]}
]
}

45
clients/src/main/resources/common/message/ListPartitionReassignmentsResponse.json

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 46,
"type": "response",
"name": "ListPartitionReassignmentsResponse",
"validVersions": "0",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Topics", "type": "[]OngoingTopicReassignment", "versions": "0+",
"about": "The ongoing reassignments for each topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]OngoingPartitionReassignment", "versions": "0+",
"about": "The ongoing reassignments for each partition.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The index of the partition." },
{ "name": "Replicas", "type": "[]int32", "versions": "0+",
"about": "The current replica set." },
{ "name": "AddingReplicas", "type": "[]int32", "versions": "0+",
"about": "The set of replicas we are currently adding." },
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+",
"about": "The set of replicas we are currently removing." }
]}
]}
]
}

43
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

@ -233,12 +233,53 @@ public final class MessageTest { @@ -233,12 +233,53 @@ public final class MessageTest {
}
@Test
public void testLeaderAndIsrVersions() throws Exception {
// Version 3 adds two new fields - AddingReplicas and RemovingReplicas
LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState partitionStateNoAddingRemovingReplicas =
new LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState()
.setName("topic")
.setPartitionStatesV0(
Collections.singletonList(
new LeaderAndIsrRequestData.LeaderAndIsrRequestPartition()
.setPartitionIndex(0)
.setReplicas(Collections.singletonList(0))
)
);
LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState partitionStateWithAddingRemovingReplicas =
new LeaderAndIsrRequestData.LeaderAndIsrRequestTopicState()
.setName("topic")
.setPartitionStatesV0(
Collections.singletonList(
new LeaderAndIsrRequestData.LeaderAndIsrRequestPartition()
.setPartitionIndex(0)
.setReplicas(Collections.singletonList(0))
.setAddingReplicas(Collections.singletonList(1))
.setRemovingReplicas(Collections.singletonList(1))
)
);
testAllMessageRoundTripsBetweenVersions(
(short) 2,
(short) 3,
new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)),
new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateNoAddingRemovingReplicas)));
testAllMessageRoundTripsFromVersion((short) 3, new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)));
}
private void testAllMessageRoundTrips(Message message) throws Exception {
testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message);
}
private void testAllMessageRoundTripsBeforeVersion(short beforeVersion, Message message, Message expected) throws Exception {
for (short version = 0; version < beforeVersion; version++) {
testAllMessageRoundTripsBetweenVersions((short) 0, beforeVersion, message, expected);
}
/**
* @param startVersion - the version we want to start at, inclusive
* @param endVersion - the version we want to end at, exclusive
*/
private void testAllMessageRoundTripsBetweenVersions(short startVersion, short endVersion, Message message, Message expected) throws Exception {
for (short version = startVersion; version < endVersion; version++) {
testMessageRoundTrip(version, message, expected);
}
}

67
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -66,6 +66,10 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera @@ -66,6 +66,10 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@ -362,6 +366,12 @@ public class RequestResponseTest { @@ -362,6 +366,12 @@ public class RequestResponseTest {
checkRequest(createIncrementalAlterConfigsRequest(), true);
checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true);
checkResponse(createIncrementalAlterConfigsResponse(), 0, true);
checkRequest(createAlterPartitionReassignmentsRequest(), true);
checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true);
checkResponse(createAlterPartitionReassignmentsResponse(), 0, true);
checkRequest(createListPartitionReassignmentsRequest(), true);
checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true);
checkResponse(createListPartitionReassignmentsResponse(), 0, true);
}
@Test
@ -1630,4 +1640,61 @@ public class RequestResponseTest { @@ -1630,4 +1640,61 @@ public class RequestResponseTest {
.setErrorMessage("Duplicate Keys"));
return new IncrementalAlterConfigsResponse(data);
}
private AlterPartitionReassignmentsRequest createAlterPartitionReassignmentsRequest() {
AlterPartitionReassignmentsRequestData data = new AlterPartitionReassignmentsRequestData();
data.topics().add(
new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("topic").setPartitions(
Collections.singletonList(
new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null)
)
)
);
return new AlterPartitionReassignmentsRequest.Builder(data).build((short) 0);
}
private AlterPartitionReassignmentsResponse createAlterPartitionReassignmentsResponse() {
AlterPartitionReassignmentsResponseData data = new AlterPartitionReassignmentsResponseData();
data.responses().add(
new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse()
.setName("topic")
.setPartitions(Collections.singletonList(
new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse()
.setPartitionIndex(0)
.setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code())
.setErrorMessage("No reassignment is in progress for topic topic partition 0")
)
)
);
return new AlterPartitionReassignmentsResponse(data);
}
private ListPartitionReassignmentsRequest createListPartitionReassignmentsRequest() {
ListPartitionReassignmentsRequestData data = new ListPartitionReassignmentsRequestData();
data.setTopics(
Collections.singletonList(
new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics()
.setName("topic")
.setPartitionIndexes(Collections.singletonList(1))
)
);
return new ListPartitionReassignmentsRequest.Builder(data).build((short) 0);
}
private ListPartitionReassignmentsResponse createListPartitionReassignmentsResponse() {
ListPartitionReassignmentsResponseData data = new ListPartitionReassignmentsResponseData();
data.topics().add(
new ListPartitionReassignmentsResponseData.OngoingTopicReassignment()
.setName("topic")
.setPartitions(Collections.singletonList(
new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
.setPartitionIndex(0)
.setReplicas(Arrays.asList(1, 2))
.setAddingReplicas(Collections.singletonList(2))
.setRemovingReplicas(Collections.singletonList(1))
)
)
);
return new ListPartitionReassignmentsResponse(data);
}
}

30
core/src/main/scala/kafka/server/KafkaApis.scala

@ -52,6 +52,8 @@ import org.apache.kafka.common.message.CreateTopicsResponseData @@ -52,6 +52,8 @@ import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteGroupsResponseData
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData
import org.apache.kafka.common.message.DeleteTopicsResponseData
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.DescribeGroupsResponseData
@ -178,6 +180,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -178,6 +180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
}
} catch {
case e: FatalExitError => throw e
@ -2299,6 +2303,32 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -2299,6 +2303,32 @@ class KafkaApis(val requestChannel: RequestChannel,
new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
}
def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
authorizeClusterAlter(request)
val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest]
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterPartitionReassignmentsResponse(
new AlterPartitionReassignmentsResponseData().setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setErrorMessage(Errors.UNSUPPORTED_VERSION.message())
.toStruct(0)
)
)
}
def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
authorizeClusterDescribe(request)
val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest]
sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListPartitionReassignmentsResponse(
new ListPartitionReassignmentsResponseData().setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setErrorMessage(Errors.UNSUPPORTED_VERSION.message())
.toStruct(0)
)
)
}
private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = {
val error = resource.`type` match {
case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED

38
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -44,6 +44,8 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData @@ -44,6 +44,8 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.message.HeartbeatRequestData
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData
import org.apache.kafka.common.message.JoinGroupRequestData
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@ -164,7 +166,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -164,7 +166,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
ApiKeys.ELECT_LEADERS -> classOf[ElectLeadersResponse],
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse]
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse],
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> classOf[AlterPartitionReassignmentsResponse],
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> classOf[ListPartitionReassignmentsResponse]
)
val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@ -212,7 +216,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -212,7 +216,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())),
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error)
IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error),
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())),
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode()))
)
val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@ -252,7 +258,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -252,7 +258,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
ApiKeys.ELECT_LEADERS -> clusterAlterAcl,
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl,
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> clusterAlterAcl,
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> clusterDescribeAcl
)
@Before
@ -485,6 +493,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -485,6 +493,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
10000
).build()
private def alterPartitionReassignmentsRequest = new AlterPartitionReassignmentsRequest.Builder(
new AlterPartitionReassignmentsRequestData().setTopics(
List(new AlterPartitionReassignmentsRequestData.ReassignableTopic()
.setName(topic)
.setPartitions(
List(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(tp.partition())).asJava
)).asJava
)
).build()
private def listPartitionReassignmentsRequest = new ListPartitionReassignmentsRequest.Builder(
new ListPartitionReassignmentsRequestData().setTopics(
List(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics()
.setName(topic)
.setPartitionIndexes(
List(new Integer(tp.partition)).asJava
)).asJava
)
).build()
@Test
def testAuthorizationWithTopicExisting() {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@ -520,7 +548,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -520,7 +548,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// Check StopReplica last since some APIs depend on replica availability
ApiKeys.STOP_REPLICA -> stopReplicaRequest,
ApiKeys.ELECT_LEADERS -> electLeadersRequest,
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest,
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> alterPartitionReassignmentsRequest,
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest
)
for ((key, request) <- requestKeyToRequest) {

14
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -41,6 +41,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData @@ -41,6 +41,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListGroupsRequestData
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData
import org.apache.kafka.common.message.OffsetCommitRequestData
import org.apache.kafka.common.message.SaslAuthenticateRequestData
import org.apache.kafka.common.message.SaslHandshakeRequestData
@ -465,6 +467,16 @@ class RequestQuotaTest extends BaseRequestTest { @@ -465,6 +467,16 @@ class RequestQuotaTest extends BaseRequestTest {
new IncrementalAlterConfigsRequest.Builder(
new IncrementalAlterConfigsRequestData())
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS =>
new AlterPartitionReassignmentsRequest.Builder(
new AlterPartitionReassignmentsRequestData()
)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS =>
new ListPartitionReassignmentsRequest.Builder(
new ListPartitionReassignmentsRequestData()
)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@ -568,6 +580,8 @@ class RequestQuotaTest extends BaseRequestTest { @@ -568,6 +580,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.ELECT_LEADERS => new ElectLeadersResponse(response).throttleTimeMs
case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => new AlterPartitionReassignmentsResponse(response).throttleTimeMs
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => new ListPartitionReassignmentsResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}

Loading…
Cancel
Save