Browse Source

MINOR: Small refactor in DescribeGroupsResponse (#12970)

This patch does a few cleanups:
* It removes `DescribeGroupsResponse.fromError` and pushes its logic to `DescribeGroupsRequest.getErrorResponse` to be consistent with how we implemented the other requests/responses.
* It renames `DescribedGroup.forError` to `DescribedGroup.groupError`.

The patch relies on existing tests.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
pull/13005/head
David Jacot 2 years ago committed by GitHub
parent
commit
f9a09fdd29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
  2. 26
      clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
  3. 2
      core/src/main/scala/kafka/server/KafkaApis.scala

18
clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java

@ -17,14 +17,13 @@ @@ -17,14 +17,13 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
public class DescribeGroupsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> {
private final DescribeGroupsRequestData data;
@ -59,11 +58,18 @@ public class DescribeGroupsRequest extends AbstractRequest { @@ -59,11 +58,18 @@ public class DescribeGroupsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
if (version() == 0) {
return DescribeGroupsResponse.fromError(DEFAULT_THROTTLE_TIME, Errors.forException(e), data.groups());
} else {
return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), data.groups());
Errors error = Errors.forException(e);
DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
data.groups().forEach(groupId ->
describeGroupsResponseData.groups().add(DescribeGroupsResponse.groupError(groupId, error))
);
if (version() >= 1) {
describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
}
return new DescribeGroupsResponse(describeGroupsResponseData);
}
public static DescribeGroupsRequest parse(ByteBuffer buffer, short version) {

26
clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java

@ -93,16 +93,21 @@ public class DescribeGroupsResponse extends AbstractResponse { @@ -93,16 +93,21 @@ public class DescribeGroupsResponse extends AbstractResponse {
final String protocolType,
final String protocol,
final List<DescribedGroupMember> members,
final int authorizedOperations) {
DescribedGroup groupMetadata = new DescribedGroup();
groupMetadata.setGroupId(groupId)
final int authorizedOperations
) {
return new DescribedGroup()
.setGroupId(groupId)
.setErrorCode(error.code())
.setGroupState(state)
.setProtocolType(protocolType)
.setProtocolData(protocol)
.setMembers(members)
.setAuthorizedOperations(authorizedOperations);
return groupMetadata;
}
public static DescribedGroup groupError(String groupId, Errors error) {
return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
}
@Override
@ -132,19 +137,6 @@ public class DescribeGroupsResponse extends AbstractResponse { @@ -132,19 +137,6 @@ public class DescribeGroupsResponse extends AbstractResponse {
return errorCounts;
}
public static DescribedGroup forError(String groupId, Errors error) {
return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
}
public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
for (String groupId : groupIds)
describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, error));
return new DescribeGroupsResponse(describeGroupsResponseData);
}
public static DescribeGroupsResponse parse(ByteBuffer buffer, short version) {
return new DescribeGroupsResponse(new DescribeGroupsResponseData(new ByteBufferAccessor(buffer), version));
}

2
core/src/main/scala/kafka/server/KafkaApis.scala

@ -1586,7 +1586,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1586,7 +1586,7 @@ class KafkaApis(val requestChannel: RequestChannel,
describeRequest.data.groups.forEach { groupId =>
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
describeGroupsResponseData.groups.add(DescribeGroupsResponse.groupError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>

Loading…
Cancel
Save