Browse Source

MINOR: Remove explicit version checks in getErrorResponse methods (#7708)

This patch removes the explicit version check pattern we used in `getErrorResponse`, which is a pain to maintain (as seen by KAFKA-9200). We already check that requests have a valid version range in the `AbstractRequest` constructor.

Reviewers: Andrew Choi <andrewchoi5@users.noreply.github.com>, Ismael Juma <ismael@juma.me.uk>
pull/7712/head
Jason Gustafson 5 years ago committed by GitHub
parent
commit
32bf0774e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
  2. 13
      clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
  3. 16
      clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
  4. 11
      clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
  5. 18
      clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
  6. 10
      clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
  7. 11
      clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
  8. 23
      clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
  9. 11
      clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
  10. 39
      clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  11. 13
      clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
  12. 20
      clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
  13. 28
      clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  14. 17
      clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
  15. 25
      clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
  16. 2
      clients/src/main/resources/common/message/DeleteTopicsResponse.json
  17. 2
      clients/src/main/resources/common/message/MetadataResponse.json
  18. 8
      clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java

18
clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java

@ -191,19 +191,11 @@ public class AlterConfigsRequest extends AbstractRequest { @@ -191,19 +191,11 @@ public class AlterConfigsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short version = version();
switch (version) {
case 0:
case 1:
ApiError error = ApiError.fromThrowable(e);
Map<ConfigResource, ApiError> errors = new HashMap<>(configs.size());
for (ConfigResource resource : configs.keySet())
errors.put(resource, error);
return new AlterConfigsResponse(throttleTimeMs, errors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
version, this.getClass().getSimpleName(), ApiKeys.ALTER_CONFIGS.latestVersion()));
}
ApiError error = ApiError.fromThrowable(e);
Map<ConfigResource, ApiError> errors = new HashMap<>(configs.size());
for (ConfigResource resource : configs.keySet())
errors.put(resource, error);
return new AlterConfigsResponse(throttleTimeMs, errors);
}
public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {

13
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java

@ -144,21 +144,10 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest { @@ -144,21 +144,10 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<TopicPartition, Errors> responseMap = new HashMap<>();
for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
responseMap.put(entry.getKey(), Errors.forException(e));
}
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
default:
throw new IllegalArgumentException(
String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion()));
}
return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
}
public Map<TopicPartition, String> partitionDirs() {

16
clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java

@ -157,18 +157,10 @@ public class CreateAclsRequest extends AbstractRequest { @@ -157,18 +157,10 @@ public class CreateAclsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
short versionId = version();
switch (versionId) {
case 0:
case 1:
List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
for (int i = 0; i < aclCreations.size(); i++)
responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
return new CreateAclsResponse(throttleTimeMs, responses);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_ACLS.latestVersion()));
}
List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
for (int i = 0; i < aclCreations.size(); i++)
responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
return new CreateAclsResponse(throttleTimeMs, responses);
}
public static CreateAclsRequest parse(ByteBuffer buffer, short version) {

11
clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java

@ -231,16 +231,7 @@ public class CreatePartitionsRequest extends AbstractRequest { @@ -231,16 +231,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
for (String topic : newPartitions.keySet()) {
topicErrors.put(topic, ApiError.fromThrowable(e));
}
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new CreatePartitionsResponse(throttleTimeMs, topicErrors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_PARTITIONS.latestVersion()));
}
return new CreatePartitionsResponse(throttleTimeMs, topicErrors);
}
public static CreatePartitionsRequest parse(ByteBuffer buffer, short version) {

18
clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java

@ -133,20 +133,12 @@ public class DeleteAclsRequest extends AbstractRequest { @@ -133,20 +133,12 @@ public class DeleteAclsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
short versionId = version();
switch (versionId) {
case 0:
case 1:
List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
for (int i = 0; i < filters.size(); i++) {
responses.add(new DeleteAclsResponse.AclFilterResponse(
ApiError.fromThrowable(throwable), Collections.emptySet()));
}
return new DeleteAclsResponse(throttleTimeMs, responses);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_ACLS.latestVersion()));
List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
for (int i = 0; i < filters.size(); i++) {
responses.add(new DeleteAclsResponse.AclFilterResponse(
ApiError.fromThrowable(throwable), Collections.emptySet()));
}
return new DeleteAclsResponse(throttleTimeMs, responses);
}
public static DeleteAclsRequest parse(ByteBuffer buffer, short version) {

10
clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java

@ -155,15 +155,7 @@ public class DeleteRecordsRequest extends AbstractRequest { @@ -155,15 +155,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
responseMap.put(entry.getKey(), new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.forException(e)));
}
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new DeleteRecordsResponse(throttleTimeMs, responseMap);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));
}
return new DeleteRecordsResponse(throttleTimeMs, responseMap);
}
public int timeout() {

11
clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java

@ -109,16 +109,7 @@ public class DescribeAclsRequest extends AbstractRequest { @@ -109,16 +109,7 @@ public class DescribeAclsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable),
Collections.emptySet());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion()));
}
return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable), Collections.emptySet());
}
public static DescribeAclsRequest parse(ByteBuffer buffer, short version) {

23
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java

@ -164,22 +164,13 @@ public class DescribeConfigsRequest extends AbstractRequest { @@ -164,22 +164,13 @@ public class DescribeConfigsRequest extends AbstractRequest {
@Override
public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short version = version();
switch (version) {
case 0:
case 1:
case 2:
ApiError error = ApiError.fromThrowable(e);
Map<ConfigResource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
Collections.emptyList());
for (ConfigResource resource : resources())
errors.put(resource, config);
return new DescribeConfigsResponse(throttleTimeMs, errors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_CONFIGS.latestVersion()));
}
ApiError error = ApiError.fromThrowable(e);
Map<ConfigResource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
Collections.emptyList());
for (ConfigResource resource : resources())
errors.put(resource, config);
return new DescribeConfigsResponse(throttleTimeMs, errors);
}
public static DescribeConfigsRequest parse(ByteBuffer buffer, short version) {

11
clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java

@ -139,16 +139,7 @@ public class DescribeLogDirsRequest extends AbstractRequest { @@ -139,16 +139,7 @@ public class DescribeLogDirsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new DescribeLogDirsResponse(throttleTimeMs, new HashMap<String, LogDirInfo>());
default:
throw new IllegalArgumentException(
String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
this.getClass().getSimpleName(), ApiKeys.DESCRIBE_LOG_DIRS.latestVersion()));
}
return new DescribeLogDirsResponse(throttleTimeMs, new HashMap<String, LogDirInfo>());
}
public boolean isAllTopicPartitions() {

39
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java

@ -117,37 +117,14 @@ public class JoinGroupRequest extends AbstractRequest { @@ -117,37 +117,14 @@ public class JoinGroupRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new JoinGroupResponse(
new JoinGroupResponseData()
.setErrorCode(Errors.forException(e).code())
.setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
.setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
.setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
.setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
.setMembers(Collections.emptyList())
);
case 2:
case 3:
case 4:
case 5:
return new JoinGroupResponse(
new JoinGroupResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
.setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
.setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
.setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
.setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
.setMembers(Collections.emptyList())
);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.JOIN_GROUP.latestVersion()));
}
return new JoinGroupResponse(new JoinGroupResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
.setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
.setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
.setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
.setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
.setMembers(Collections.emptyList()));
}
public static JoinGroupRequest parse(ByteBuffer buffer, short version) {

13
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java

@ -269,18 +269,7 @@ public class ListOffsetRequest extends AbstractRequest { @@ -269,18 +269,7 @@ public class ListOffsetRequest extends AbstractRequest {
responseData.put(partition, partitionError);
}
switch (versionId) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
return new ListOffsetResponse(throttleTimeMs, responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.LIST_OFFSETS.latestVersion()));
}
return new ListOffsetResponse(throttleTimeMs, responseData);
}
public int replicaId() {

20
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java

@ -133,24 +133,8 @@ public class MetadataRequest extends AbstractRequest { @@ -133,24 +133,8 @@ public class MetadataRequest extends AbstractRequest {
.setPartitions(Collections.emptyList()));
}
short versionId = version();
switch (versionId) {
case 0:
case 1:
case 2:
return new MetadataResponse(responseData);
case 3:
case 4:
case 5:
case 6:
case 7:
case 8:
responseData.setThrottleTimeMs(throttleTimeMs);
return new MetadataResponse(responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.METADATA.latestVersion()));
}
responseData.setThrottleTimeMs(throttleTimeMs);
return new MetadataResponse(responseData);
}
public boolean isAllTopics() {

28
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java

@ -124,31 +124,9 @@ public class OffsetCommitRequest extends AbstractRequest { @@ -124,31 +124,9 @@ public class OffsetCommitRequest extends AbstractRequest {
public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<OffsetCommitResponseTopic>
responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e));
short versionId = version();
switch (versionId) {
case 0:
case 1:
case 2:
return new OffsetCommitResponse(
new OffsetCommitResponseData()
.setTopics(responseTopicData)
);
case 3:
case 4:
case 5:
case 6:
case 7:
return new OffsetCommitResponse(
new OffsetCommitResponseData()
.setTopics(responseTopicData)
.setThrottleTimeMs(throttleTimeMs)
);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion()));
}
return new OffsetCommitResponse(new OffsetCommitResponseData()
.setTopics(responseTopicData)
.setThrottleTimeMs(throttleTimeMs));
}
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {

17
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java

@ -333,22 +333,7 @@ public class ProduceRequest extends AbstractRequest { @@ -333,22 +333,7 @@ public class ProduceRequest extends AbstractRequest {
for (TopicPartition tp : partitions())
responseMap.put(tp, partitionResponse);
short versionId = version();
switch (versionId) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
case 8:
return new ProduceResponse(responseMap, throttleTimeMs);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.PRODUCE.latestVersion()));
}
return new ProduceResponse(responseMap, throttleTimeMs);
}
@Override

25
clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java

@ -67,27 +67,10 @@ public class SyncGroupRequest extends AbstractRequest { @@ -67,27 +67,10 @@ public class SyncGroupRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(Errors.forException(e).code())
.setAssignment(new byte[0])
);
case 1:
case 2:
case 3:
return new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(Errors.forException(e).code())
.setAssignment(new byte[0])
.setThrottleTimeMs(throttleTimeMs)
);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion()));
}
return new SyncGroupResponse(new SyncGroupResponseData()
.setErrorCode(Errors.forException(e).code())
.setAssignment(new byte[0])
.setThrottleTimeMs(throttleTimeMs));
}
public Map<String, ByteBuffer> groupAssignments() {

2
clients/src/main/resources/common/message/DeleteTopicsResponse.json

@ -27,7 +27,7 @@ @@ -27,7 +27,7 @@
"validVersions": "0-4",
"flexibleVersions": "4+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
"about": "The results for each topic we tried to delete.", "fields": [

2
clients/src/main/resources/common/message/MetadataResponse.json

@ -39,7 +39,7 @@ @@ -39,7 +39,7 @@
"validVersions": "0-9",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+",
"about": "Each broker in the response.", "fields": [

8
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java

@ -34,7 +34,6 @@ import java.util.HashMap; @@ -34,7 +34,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@ -97,12 +96,7 @@ public class OffsetCommitRequestTest { @@ -97,12 +96,7 @@ public class OffsetCommitRequestTest {
OffsetCommitResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());
assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts());
if (version >= 3) {
assertEquals(throttleTimeMs, response.throttleTimeMs());
} else {
assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
}
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
}

Loading…
Cancel
Save