Browse Source

KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1)

-  Use automatic RPC generation in DescribeGroups Request/Response classes

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #6322 from omkreddy/KIP-430-Return-Ops
pull/6355/head
Manikumar Reddy 6 years ago committed by Manikumar Reddy
parent
commit
f11fa5ef40
  1. 31
      clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
  2. 10
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
  3. 51
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  4. 8
      clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
  5. 2
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  6. 74
      clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
  7. 290
      clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
  8. 24
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  9. 7
      clients/src/main/resources/common/message/DescribeGroupsRequest.json
  10. 7
      clients/src/main/resources/common/message/DescribeGroupsResponse.json
  11. 30
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  12. 19
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  13. 27
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  14. 10
      core/src/main/scala/kafka/admin/AclCommand.scala
  15. 24
      core/src/main/scala/kafka/admin/AdminClient.scala
  16. 7
      core/src/main/scala/kafka/security/auth/ResourceType.scala
  17. 64
      core/src/main/scala/kafka/server/KafkaApis.scala
  18. 13
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
  19. 10
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  20. 121
      core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
  21. 8
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

31
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java

@ -19,12 +19,14 @@ package org.apache.kafka.clients.admin; @@ -19,12 +19,14 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
/**
* A detailed description of a single consumer group in the cluster.
@ -36,13 +38,15 @@ public class ConsumerGroupDescription { @@ -36,13 +38,15 @@ public class ConsumerGroupDescription {
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
private Set<AclOperation> authorizedOperations;
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator) {
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
@ -50,23 +54,26 @@ public class ConsumerGroupDescription { @@ -50,23 +54,26 @@ public class ConsumerGroupDescription {
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.state = state;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}
@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConsumerGroupDescription that = (ConsumerGroupDescription) o;
final ConsumerGroupDescription that = (ConsumerGroupDescription) o;
return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
groupId.equals(that.groupId) &&
members.equals(that.members) &&
partitionAssignor.equals(that.partitionAssignor) &&
state.equals(that.state);
Objects.equals(groupId, that.groupId) &&
Objects.equals(members, that.members) &&
Objects.equals(partitionAssignor, that.partitionAssignor) &&
state == that.state &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(isSimpleConsumerGroup, groupId, members, partitionAssignor, state);
return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, state, coordinator, authorizedOperations);
}
/**
@ -111,6 +118,13 @@ public class ConsumerGroupDescription { @@ -111,6 +118,13 @@ public class ConsumerGroupDescription {
return coordinator;
}
/**
* authorizedOperations for this group
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}
@Override
public String toString() {
return "(groupId=" + groupId +
@ -119,6 +133,7 @@ public class ConsumerGroupDescription { @@ -119,6 +133,7 @@ public class ConsumerGroupDescription {
", partitionAssignor=" + partitionAssignor +
", state=" + state +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
")";
}
}

10
clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java

@ -28,4 +28,14 @@ import java.util.Collection; @@ -28,4 +28,14 @@ import java.util.Collection;
*/
@InterfaceStability.Evolving
public class DescribeConsumerGroupsOptions extends AbstractOptions<DescribeConsumerGroupsOptions> {
private boolean includeAuthorizedOperations;
public DescribeConsumerGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
}

51
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -45,6 +45,7 @@ import org.apache.kafka.common.TopicPartitionInfo; @@ -45,6 +45,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
@ -62,6 +63,9 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -62,6 +63,9 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -128,9 +132,11 @@ import org.apache.kafka.common.utils.AppInfoParser; @@ -128,9 +132,11 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -2430,7 +2436,10 @@ public class KafkaAdminClient extends AdminClient { @@ -2430,7 +2436,10 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
return new DescribeGroupsRequest.Builder(
new DescribeGroupsRequestData()
.setGroups(Collections.singletonList(groupId))
.setIncludeAuthorizedOperations(options.includeAuthorizedOperations()));
}
@Override
@ -2438,23 +2447,27 @@ public class KafkaAdminClient extends AdminClient { @@ -2438,23 +2447,27 @@ public class KafkaAdminClient extends AdminClient {
final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
final DescribedGroup describedGroup = response.data()
.groups()
.stream()
.filter(group -> groupId.equals(group.groupId()))
.findFirst().get();
final Errors groupError = groupMetadata.error();
final Errors groupError = Errors.forCode(describedGroup.errorCode());
if (groupError != Errors.NONE) {
// TODO: KAFKA-6789, we can retry based on the error code
future.completeExceptionally(groupError.exception());
} else {
final String protocolType = groupMetadata.protocolType();
final String protocolType = describedGroup.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
final List<DescribedGroupMember> members = describedGroup.members();
final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
for (DescribeGroupsResponse.GroupMember groupMember : members) {
final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
for (DescribedGroupMember groupMember : members) {
Set<TopicPartition> partitions = Collections.emptySet();
if (groupMember.memberAssignment().remaining() > 0) {
if (groupMember.memberAssignment().length > 0) {
final PartitionAssignor.Assignment assignment = ConsumerProtocol.
deserializeAssignment(groupMember.memberAssignment().duplicate());
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
partitions = new HashSet<>(assignment.partitions());
}
final MemberDescription memberDescription =
@ -2465,12 +2478,12 @@ public class KafkaAdminClient extends AdminClient { @@ -2465,12 +2478,12 @@ public class KafkaAdminClient extends AdminClient {
memberDescriptions.add(memberDescription);
}
final ConsumerGroupDescription consumerGroupDescription =
new ConsumerGroupDescription(groupId,
protocolType.isEmpty(),
new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
memberDescriptions,
groupMetadata.protocol(),
ConsumerGroupState.parse(groupMetadata.state()),
fcResponse.node());
describedGroup.protocolData(),
ConsumerGroupState.parse(describedGroup.groupState()),
fcResponse.node(),
authorizedOperations);
future.complete(consumerGroupDescription);
}
}
@ -2495,6 +2508,16 @@ public class KafkaAdminClient extends AdminClient { @@ -2495,6 +2508,16 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConsumerGroupsResult(new HashMap<>(futures));
}
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
return Utils.from32BitField(authorizedOperations)
.stream()
.map(AclOperation::fromCode)
.filter(operation -> operation != AclOperation.UNKNOWN
&& operation != AclOperation.ALL
&& operation != AclOperation.ANY)
.collect(Collectors.toSet());
}
private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
Errors error = response.error();
if (error.exception() instanceof RetriableException) {

8
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol; @@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; @@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
@ -139,8 +139,8 @@ public enum ApiKeys { @@ -139,8 +139,8 @@ public enum ApiKeys {
HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(),
DescribeGroupsResponse.schemaVersions()),
DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
DescribeGroupsResponseData.SCHEMAS),
LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) {

2
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -101,7 +101,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { @@ -101,7 +101,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case LEADER_AND_ISR:
return new LeaderAndIsrResponse(struct);
case DESCRIBE_GROUPS:
return new DescribeGroupsResponse(struct);
return new DescribeGroupsResponse(struct, version);
case LIST_GROUPS:
return new ListGroupsResponse(struct);
case SASL_HANDSHAKE:

74
clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java

@ -16,97 +16,65 @@ @@ -16,97 +16,65 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
public class DescribeGroupsRequest extends AbstractRequest {
private static final String GROUP_IDS_KEY_NAME = "group_ids";
/* Describe group api */
private static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(
new Field(GROUP_IDS_KEY_NAME, new ArrayOf(STRING), "List of groupIds to request metadata for (an " +
"empty groupId array will return empty group metadata)."));
/* v1 request is the same as v0. Throttle time has been added to response */
private static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema DESCRIBE_GROUPS_REQUEST_V2 = DESCRIBE_GROUPS_REQUEST_V1;
public static Schema[] schemaVersions() {
return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1, DESCRIBE_GROUPS_REQUEST_V2};
}
public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> {
private final List<String> groupIds;
private final DescribeGroupsRequestData data;
public Builder(List<String> groupIds) {
public Builder(DescribeGroupsRequestData data) {
super(ApiKeys.DESCRIBE_GROUPS);
this.groupIds = groupIds;
this.data = data;
}
@Override
public DescribeGroupsRequest build(short version) {
return new DescribeGroupsRequest(this.groupIds, version);
return new DescribeGroupsRequest(data, version);
}
@Override
public String toString() {
return "(type=DescribeGroupsRequest, groupIds=(" + Utils.join(groupIds, ",") + "))";
return data.toString();
}
}
private final List<String> groupIds;
private final DescribeGroupsRequestData data;
private final short version;
private DescribeGroupsRequest(List<String> groupIds, short version) {
private DescribeGroupsRequest(DescribeGroupsRequestData data, short version) {
super(ApiKeys.DESCRIBE_GROUPS, version);
this.groupIds = groupIds;
this.data = data;
this.version = version;
}
public DescribeGroupsRequest(Struct struct, short version) {
super(ApiKeys.DESCRIBE_GROUPS, version);
this.groupIds = new ArrayList<>();
for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
this.groupIds.add((String) groupId);
this.data = new DescribeGroupsRequestData(struct, version);
this.version = version;
}
public List<String> groupIds() {
return groupIds;
public DescribeGroupsRequestData data() {
return data;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.requestSchema(version()));
struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
return struct;
return data.toStruct(version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short version = version();
switch (version) {
case 0:
return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
case 1:
case 2:
return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_GROUPS.latestVersion()));
if (version == 0) {
return DescribeGroupsResponse.fromError(DEFAULT_THROTTLE_TIME, Errors.forException(e), data.groups());
} else {
return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), data.groups());
}
}

290
clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java

@ -16,80 +16,23 @@ @@ -16,80 +16,23 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import java.util.Set;
public class DescribeGroupsResponse extends AbstractResponse {
private static final String GROUPS_KEY_NAME = "groups";
private static final String GROUP_STATE_KEY_NAME = "state";
private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
private static final String PROTOCOL_KEY_NAME = "protocol";
private static final String MEMBERS_KEY_NAME = "members";
private static final String CLIENT_ID_KEY_NAME = "client_id";
private static final String CLIENT_HOST_KEY_NAME = "client_host";
private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(
MEMBER_ID,
new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the member's latest join group request"),
new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in the request session corresponding to the " +
"member's join group."),
new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata corresponding to the current group protocol in " +
"use (will only be present if the group is stable)."),
new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "The current assignment provided by the group leader " +
"(will only be present if the group is stable)."));
private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(
ERROR_CODE,
GROUP_ID,
new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, CompletingRebalance, " +
"PreparingRebalance, or empty if there is no active group)"),
new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"),
new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided if the group is Stable)"),
new Field(MEMBERS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "Current group members " +
"(only provided if the group is not Dead)"));
private static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(
new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
private static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
THROTTLE_TIME_MS,
new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema DESCRIBE_GROUPS_RESPONSE_V2 = DESCRIBE_GROUPS_RESPONSE_V1;
public static Schema[] schemaVersions() {
return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1, DESCRIBE_GROUPS_RESPONSE_V2};
}
public static final String UNKNOWN_STATE = "";
public static final String UNKNOWN_PROTOCOL_TYPE = "";
public static final String UNKNOWN_PROTOCOL = "";
/**
* Possible per-group error codes:
*
@ -99,197 +42,92 @@ public class DescribeGroupsResponse extends AbstractResponse { @@ -99,197 +42,92 @@ public class DescribeGroupsResponse extends AbstractResponse {
* AUTHORIZATION_FAILED (29)
*/
private final Map<String, GroupMetadata> groups;
private final int throttleTimeMs;
private DescribeGroupsResponseData data;
public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
this(DEFAULT_THROTTLE_TIME, groups);
public DescribeGroupsResponse(DescribeGroupsResponseData data) {
this.data = data;
}
public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) {
this.throttleTimeMs = throttleTimeMs;
this.groups = groups;
public DescribeGroupsResponse(Struct struct, short version) {
this.data = new DescribeGroupsResponseData(struct, version);
}
public DescribeGroupsResponse(Struct struct) {
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
this.groups = new HashMap<>();
for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
Struct groupStruct = (Struct) groupObj;
public static DescribedGroupMember groupMember(
final String memberId,
final String clientId,
final String clientHost,
final byte[] assignment,
final byte[] metadata) {
return new DescribedGroupMember()
.setMemberId(memberId)
.setClientId(clientId)
.setClientHost(clientHost)
.setMemberAssignment(assignment)
.setMemberMetadata(metadata);
}
String groupId = groupStruct.get(GROUP_ID);
Errors error = Errors.forCode(groupStruct.get(ERROR_CODE));
String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
public static DescribedGroup groupMetadata(
final String groupId,
final Errors error,
final String state,
final String protocolType,
final String protocol,
final List<DescribedGroupMember> members,
final Set<Byte> authorizedOperations) {
DescribedGroup groupMetada = new DescribedGroup();
groupMetada.setGroupId(groupId)
.setErrorCode(error.code())
.setGroupState(state)
.setProtocolType(protocolType)
.setProtocolData(protocol)
.setMembers(members)
.setAuthorizedOperations(Utils.to32BitField(authorizedOperations));
return groupMetada;
}
List<GroupMember> members = new ArrayList<>();
for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
Struct memberStruct = (Struct) memberObj;
String memberId = memberStruct.get(MEMBER_ID);
String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME);
ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
members.add(new GroupMember(memberId, clientId, clientHost,
memberMetadata, memberAssignment));
}
this.groups.put(groupId, new GroupMetadata(error, state, protocolType, protocol, members));
}
public DescribeGroupsResponseData data() {
return data;
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
protected Struct toStruct(short version) {
return data.toStruct(version);
}
public Map<String, GroupMetadata> groups() {
return groups;
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
public static final String UNKNOWN_STATE = "";
public static final String UNKNOWN_PROTOCOL_TYPE = "";
public static final String UNKNOWN_PROTOCOL = "";
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
for (GroupMetadata response : groups.values())
updateErrorCounts(errorCounts, response.error);
data.groups().forEach(describedGroup -> {
updateErrorCounts(errorCounts, Errors.forCode(describedGroup.errorCode()));
});
return errorCounts;
}
public static class GroupMetadata {
private final Errors error;
private final String state;
private final String protocolType;
private final String protocol;
private final List<GroupMember> members;
public GroupMetadata(Errors error,
String state,
String protocolType,
String protocol,
List<GroupMember> members) {
this.error = error;
this.state = state;
this.protocolType = protocolType;
this.protocol = protocol;
this.members = members;
}
public Errors error() {
return error;
}
public String state() {
return state;
}
public String protocolType() {
return protocolType;
}
public String protocol() {
return protocol;
}
public List<GroupMember> members() {
return members;
}
public static GroupMetadata forError(Errors error) {
return new DescribeGroupsResponse.GroupMetadata(
error,
DescribeGroupsResponse.UNKNOWN_STATE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL,
Collections.emptyList());
}
}
public static class GroupMember {
private final String memberId;
private final String clientId;
private final String clientHost;
private final ByteBuffer memberMetadata;
private final ByteBuffer memberAssignment;
public GroupMember(String memberId,
String clientId,
String clientHost,
ByteBuffer memberMetadata,
ByteBuffer memberAssignment) {
this.memberId = memberId;
this.clientId = clientId;
this.clientHost = clientHost;
this.memberMetadata = memberMetadata;
this.memberAssignment = memberAssignment;
}
public String memberId() {
return memberId;
}
public String clientId() {
return clientId;
}
public String clientHost() {
return clientHost;
}
public ByteBuffer memberMetadata() {
return memberMetadata;
}
public ByteBuffer memberAssignment() {
return memberAssignment;
}
}
public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
return fromError(DEFAULT_THROTTLE_TIME, error, groupIds);
public static DescribedGroup forError(String groupId, Errors error) {
return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), Collections.emptySet());
}
public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
GroupMetadata errorMetadata = GroupMetadata.forError(error);
Map<String, GroupMetadata> groups = new HashMap<>();
DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
for (String groupId : groupIds)
groups.put(groupId, errorMetadata);
return new DescribeGroupsResponse(throttleTimeMs, groups);
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> groupStructs = new ArrayList<>();
for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
GroupMetadata group = groupEntry.getValue();
groupStruct.set(GROUP_ID, groupEntry.getKey());
groupStruct.set(ERROR_CODE, group.error.code());
groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
List<Struct> membersList = new ArrayList<>();
for (GroupMember member : group.members) {
Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
memberStruct.set(MEMBER_ID, member.memberId);
memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata);
memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment);
membersList.add(memberStruct);
}
groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
groupStructs.add(groupStruct);
}
struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
return struct;
describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, error));
return new DescribeGroupsResponse(describeGroupsResponseData);
}
public static DescribeGroupsResponse parse(ByteBuffer buffer, short version) {
return new DescribeGroupsResponse(ApiKeys.DESCRIBE_GROUPS.parseResponse(version, buffer));
return new DescribeGroupsResponse(
ApiKeys.DESCRIBE_GROUPS.responseSchema(version).read(buffer), version);
}
@Override

24
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -1005,4 +1005,28 @@ public final class Utils { @@ -1005,4 +1005,28 @@ public final class Utils {
.collect(Collectors.collectingAndThen(Collectors.toList(), finisher));
}
public static int to32BitField(final Set<Byte> bytes) {
int value = 0;
for (final byte b : bytes)
value |= 1 << checkRange(b);
return value;
}
private static byte checkRange(final byte i) {
if (i > 31)
throw new IllegalArgumentException("out of range: i>31, i = " + i);
if (i < 0)
throw new IllegalArgumentException("out of range: i<0, i = " + i);
return i;
}
public static Set<Byte> from32BitField(final int intValue) {
Set<Byte> result = new HashSet<>();
for (int itr = intValue, count = 0; itr != 0; itr >>>= 1) {
if ((itr & 1) != 0)
result.add((byte) count);
count++;
}
return result;
}
}

7
clients/src/main/resources/common/message/DescribeGroupsRequest.json

@ -18,9 +18,12 @@ @@ -18,9 +18,12 @@
"type": "request",
"name": "DescribeGroupsRequest",
// Versions 1 and 2 are the same as version 0.
"validVersions": "0-2",
// Starting in version 3, authorized operations can be requested.
"validVersions": "0-3",
"fields": [
{ "name": "Groups", "type": "[]string", "versions": "0+",
"about": "The names of the groups to describe" }
"about": "The names of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
"about": "Whether to include authorized operations." }
]
}

7
clients/src/main/resources/common/message/DescribeGroupsResponse.json

@ -19,7 +19,8 @@ @@ -19,7 +19,8 @@
"name": "DescribeGroupsResponse",
// Version 1 added throttle time.
// Starting in version 2, on quota violation, brokers send out responses before throttling.
"validVersions": "0-2",
// Starting in version 3, brokers can send authorized operations.
"validVersions": "0-3",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@ -51,7 +52,9 @@ @@ -51,7 +52,9 @@
// This is currently only provided if the group is in the Stable state.
{ "name": "MemberAssignment", "type": "bytes", "versions": "0+",
"about": "The current assignment provided by the group leader." }
]}
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "3+",
"about": "32-bit bitfield to represent authorized operations for this group." }
]}
]
}

30
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.UnknownServerException; @@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
@ -1055,7 +1056,7 @@ public class KafkaAdminClientTest { @@ -1055,7 +1056,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
DescribeGroupsResponseData data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@ -1066,29 +1067,34 @@ public class KafkaAdminClientTest { @@ -1066,29 +1067,34 @@ public class KafkaAdminClientTest {
topicPartitions.add(2, myTopicPartition2);
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
memberAssignment.get(memberAssignmentBytes);
groupMetadataMap.put(
"group-0",
new DescribeGroupsResponse.GroupMetadata(
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-0",
Errors.NONE,
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
asList(
new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
groupMetadataMap.put(
"group-connect-0",
new DescribeGroupsResponse.GroupMetadata(
DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-connect-0",
Errors.NONE,
"",
"connect",
"",
asList(
new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();

19
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -38,6 +38,8 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; @@ -38,6 +38,8 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
@ -767,18 +769,21 @@ public class RequestResponseTest { @@ -767,18 +769,21 @@ public class RequestResponseTest {
}
private DescribeGroupsRequest createDescribeGroupRequest() {
return new DescribeGroupsRequest.Builder(singletonList("test-group")).build();
return new DescribeGroupsRequest.Builder(
new DescribeGroupsRequestData().
setGroups(Collections.singletonList("test-group"))).build();
}
private DescribeGroupsResponse createDescribeGroupResponse() {
String clientId = "consumer-1";
String clientHost = "localhost";
ByteBuffer empty = ByteBuffer.allocate(0);
DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
clientId, clientHost, empty, empty);
DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE,
"STABLE", "consumer", "roundrobin", asList(member));
return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId",
clientId, clientHost, new byte[0], new byte[0]);
DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE,
"STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet());
describeGroupsResponseData.groups().add(metadata);
return new DescribeGroupsResponse(describeGroupsResponseData);
}
private LeaveGroupRequest createLeaveGroupRequest() {

27
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -30,16 +30,19 @@ import java.nio.channels.FileChannel; @@ -30,16 +30,19 @@ import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -110,8 +113,8 @@ public class UtilsTest { @@ -110,8 +113,8 @@ public class UtilsTest {
@Test
public void testJoin() {
assertEquals("", Utils.join(Collections.emptyList(), ","));
assertEquals("1", Utils.join(Arrays.asList("1"), ","));
assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
assertEquals("1", Utils.join(asList("1"), ","));
assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ","));
}
@Test
@ -466,4 +469,22 @@ public class UtilsTest { @@ -466,4 +469,22 @@ public class UtilsTest {
Utils.delete(tempDir);
assertFalse(Files.exists(tempDir.toPath()));
}
@Test
public void testConvertTo32BitField() {
Set<Byte> bytes = mkSet((byte) 0, (byte) 1, (byte) 5, (byte) 10, (byte) 31);
int bitField = Utils.to32BitField(bytes);
assertEquals(bytes, Utils.from32BitField(bitField));
bytes = new HashSet<>();
bitField = Utils.to32BitField(bytes);
assertEquals(bytes, Utils.from32BitField(bitField));
bytes = mkSet((byte) 0, (byte) 11, (byte) 32);
try {
Utils.to32BitField(bytes);
fail("Expected exception not thrown");
} catch (IllegalArgumentException e) {
}
}
}

10
core/src/main/scala/kafka/admin/AclCommand.scala

@ -41,14 +41,6 @@ object AclCommand extends Logging { @@ -41,14 +41,6 @@ object AclCommand extends Logging {
private val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations: Map[JResourceType, Set[Operation]] = Map[JResourceType, Set[Operation]](
JResourceType.TOPIC -> Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs, All),
JResourceType.GROUP -> Set(Read, Describe, Delete, All),
JResourceType.CLUSTER -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
JResourceType.TRANSACTIONAL_ID -> Set(Describe, Write, All),
JResourceType.DELEGATION_TOKEN -> Set(Describe, All)
)
def main(args: Array[String]) {
val opts = new AclCommandOptions(args)
@ -454,7 +446,7 @@ object AclCommand extends Logging { @@ -454,7 +446,7 @@ object AclCommand extends Logging {
private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[Acl]]): Unit = {
for ((resource, acls) <- resourceToAcls) {
val validOps = ResourceTypeToValidOperations(resource.resourceType)
val validOps = ResourceType.fromJava(resource.resourceType).supportedOperations + All
if ((acls.map(_.operation) -- validOps).nonEmpty)
CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}")
}

24
core/src/main/scala/kafka/admin/AdminClient.scala

@ -27,15 +27,15 @@ import org.apache.kafka.common.config.ConfigDef.ValidString._ @@ -27,15 +27,15 @@ import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
@ -233,20 +233,20 @@ class AdminClient(val time: Time, @@ -233,20 +233,20 @@ class AdminClient(val time: Time,
consumers: Option[List[ConsumerSummary]],
coordinator: Node)
def describeConsumerGroupHandler(coordinator: Node, groupId: String): GroupMetadata = {
def describeConsumerGroupHandler(coordinator: Node, groupId: String): DescribeGroupsResponseData.DescribedGroup = {
val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)))
new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(Collections.singletonList(groupId))))
val response = responseBody.asInstanceOf[DescribeGroupsResponse]
val metadata = response.groups.get(groupId)
if (metadata == null)
throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
val metadata = response.data().groups().asScala.find(group => groupId.equals(group.groupId()))
.getOrElse(throw new KafkaException(s"Response from broker contained no metadata for group $groupId"))
metadata
}
def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = {
def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean =
metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
def isValidConsumerGroupResponse(metadata: DescribeGroupsResponseData.DescribedGroup): Boolean =
metadata.errorCode() == Errors.NONE.code() && (metadata.groupState() == "Dead" ||
metadata.groupState() == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
val startTime = time.milliseconds
val coordinator = findCoordinator(groupId, timeoutMs)
@ -262,16 +262,16 @@ class AdminClient(val time: Time, @@ -262,16 +262,16 @@ class AdminClient(val time: Time,
throw new TimeoutException("The consumer group command timed out while waiting for group to initialize")
val consumers = metadata.members.asScala.map { consumer =>
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.groupState() match {
case "Stable" =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(consumer.memberAssignment))
assignment.partitions.asScala.toList
case _ =>
List()
})
}.toList
ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
ConsumerGroupSummary(metadata.groupState(), metadata.protocolData(), Some(consumers), coordinator)
}
def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {

7
core/src/main/scala/kafka/security/auth/ResourceType.scala

@ -23,6 +23,8 @@ import org.apache.kafka.common.resource.{ResourceType => JResourceType} @@ -23,6 +23,8 @@ import org.apache.kafka.common.resource.{ResourceType => JResourceType}
sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
def error: Errors
def toJava: JResourceType
// this method output will not include "All" Operation type
def supportedOperations: Set[Operation]
override def compare(that: ResourceType): Int = this.name compare that.name
}
@ -31,30 +33,35 @@ case object Topic extends ResourceType { @@ -31,30 +33,35 @@ case object Topic extends ResourceType {
val name = "Topic"
val error = Errors.TOPIC_AUTHORIZATION_FAILED
val toJava = JResourceType.TOPIC
val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
}
case object Group extends ResourceType {
val name = "Group"
val error = Errors.GROUP_AUTHORIZATION_FAILED
val toJava = JResourceType.GROUP
val supportedOperations = Set(Read, Describe, Delete)
}
case object Cluster extends ResourceType {
val name = "Cluster"
val error = Errors.CLUSTER_AUTHORIZATION_FAILED
val toJava = JResourceType.CLUSTER
val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
}
case object TransactionalId extends ResourceType {
val name = "TransactionalId"
val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
val toJava = JResourceType.TRANSACTIONAL_ID
val supportedOperations = Set(Describe, Write)
}
case object DelegationToken extends ResourceType {
val name = "DelegationToken"
val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
val toJava = JResourceType.DELEGATION_TOKEN
val supportedOperations : Set[Operation] = Set(Describe)
}
object ResourceType {

64
core/src/main/scala/kafka/server/KafkaApis.scala

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package kafka.server
import java.lang.{Long => JLong}
import java.lang.{Byte => JByte}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ConcurrentHashMap
@ -44,7 +45,7 @@ import org.apache.kafka.common.errors._ @@ -44,7 +45,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroupsResponseData}
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
@ -1186,24 +1187,65 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1186,24 +1187,65 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeGroupRequest(request: RequestChannel.Request) {
def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
new DescribeGroupsResponse(describeGroupsResponseData)
}
sendResponseMaybeThrottle(request, createResponse)
}
val describeRequest = request.body[DescribeGroupsRequest]
val describeGroupsResponseData = new DescribeGroupsResponseData()
val groups = describeRequest.groupIds.asScala.map { groupId =>
if (!authorize(request.session, Describe, Resource(Group, groupId, LITERAL))) {
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
describeRequest.data().groups().asScala.foreach { groupId =>
val resource = Resource(Group, groupId, LITERAL)
if (!authorize(request.session, Describe, resource)) {
describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>
val metadata = ByteBuffer.wrap(member.metadata)
val assignment = ByteBuffer.wrap(member.assignment)
new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
new DescribeGroupsResponseData.DescribedGroupMember()
.setMemberId(member.memberId)
.setClientId(member.clientId)
.setClientHost(member.clientHost)
.setMemberAssignment(member.assignment)
.setMemberMetadata(member.assignment)
}
groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType,
summary.protocol, members.asJava)
val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
.setErrorCode(error.code())
.setGroupId(groupId)
.setGroupState(summary.state)
.setProtocolType(summary.protocolType)
.setProtocolData(summary.protocol)
.setMembers(members.asJava)
if (request.header.apiVersion >= 3) {
if (error == Errors.NONE && describeRequest.data().includeAuthorizedOperations()) {
describedGroup.setAuthorizedOperations(authorizedOperations(request.session, resource))
} else {
describedGroup.setAuthorizedOperations(0)
}
}
describeGroupsResponseData.groups().add(describedGroup)
}
}.toMap
}
sendResponseCallback(describeGroupsResponseData)
}
private def authorizedOperations(session: RequestChannel.Session, resource: Resource): Int = {
val authorizedOps = authorizer match {
case None => resource.resourceType.supportedOperations
case Some(auth) => resource.resourceType.supportedOperations
.filter(operation => authorize(session, operation, resource))
}
sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava))
Utils.to32BitField(authorizedOps.map(operation => operation.toJava.code().asInstanceOf[JByte]).asJava)
}
def handleListGroupsRequest(request: RequestChannel.Request) {

13
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -47,14 +47,14 @@ import org.junit.Assert._ @@ -47,14 +47,14 @@ import org.junit.Assert._
import scala.util.Random
import scala.collection.JavaConverters._
import kafka.zk.KafkaZkClient
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import java.lang.{Long => JLong}
import kafka.security.auth.Group
/**
* An integration test of the KafkaAdminClient.
*
@ -1142,12 +1142,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1142,12 +1142,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
!matching.isEmpty
}, s"Expected to be able to list $testGroupId")
val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, result.describedGroups().size())
// Test that we can get information about the test consumer group.
assertTrue(result.describedGroups().containsKey(testGroupId))
val testGroupDescription = result.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup())
assertEquals(1, testGroupDescription.members().size())
@ -1157,14 +1159,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1157,14 +1159,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(testNumPartitions, topicPartitions.size())
assertEquals(testNumPartitions, topicPartitions.asScala.
count(tp => tp.topic().equals(testTopicName)))
val expectedOperations = Group.supportedOperations
.map(operation => operation.toJava).asJava
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
assertTrue(result.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get()
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals("", fakeGroupDescription.partitionAssignor())
assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, result.all().get().size())

10
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -33,9 +33,8 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter @@ -33,9 +33,8 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.LeaveGroupRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
@ -163,7 +162,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -163,7 +162,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => {
val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode()
Errors.forCode(errorCode)
}),
ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
@ -318,7 +320,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -318,7 +320,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createDescribeGroupsRequest = {
new DescribeGroupsRequest.Builder(List(group).asJava).build()
new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List(group).asJava)).build()
}
private def createOffsetCommitRequest = {

121
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala

@ -0,0 +1,121 @@ @@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.api
import java.io.File
import java.util
import java.util.Properties
import kafka.security.auth.{Allow, Alter, Authorizer, ClusterAction, Group, Operation, PermissionType, SimpleAclAuthorizer, Acl => AuthAcl, Resource => AuthResource}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, DescribeConsumerGroupsOptions}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.junit.Assert.assertEquals
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 1
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
var client: AdminClient = _
val group1 = "group1"
val group2 = "group2"
val group3 = "group3"
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override def configureSecurityBeforeServersStart() {
val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
try {
authorizer.configure(this.configs.head.originals())
authorizer.addAcls(Set(clusterAcl(JaasTestUtils.KafkaServerPrincipalUnqualifiedName, Allow, ClusterAction),
clusterAcl(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, Allow, Alter)),
AuthResource.ClusterResource)
} finally {
authorizer.close()
}
}
@Before
override def setUp(): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
super.setUp()
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
}
private def clusterAcl(userName: String, permissionType: PermissionType, operation: Operation): AuthAcl = {
new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName), permissionType,
AuthAcl.WildCardHost, operation)
}
@After
override def tearDown(): Unit = {
if (client != null)
Utils.closeQuietly(client, "AdminClient")
super.tearDown()
closeSasl()
}
val group1Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group1, PatternType.LITERAL),
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW))
val group2Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group2, PatternType.LITERAL),
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group3, PatternType.LITERAL),
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW))
def createConfig(): Properties = {
val adminClientConfig = new Properties()
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.asScala.foreach { case (key, value) => adminClientConfig.put(key.asInstanceOf[String], value) }
adminClientConfig
}
@Test
def testConsumerGroupAuthorizedOperations(): Unit = {
client = AdminClient.create(createConfig())
val results = client.createAcls(List(group1Acl, group2Acl, group3Acl).asJava)
assertEquals(Set(group1Acl, group2Acl, group3Acl), results.values.keySet.asScala)
results.all.get
val describeConsumerGroupsResult = client.describeConsumerGroups(Seq(group1, group2, group3).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(3, describeConsumerGroupsResult.describedGroups().size())
val expectedOperations = Group.supportedOperations
.map(operation => operation.toJava).asJava
val group1Description = describeConsumerGroupsResult.describedGroups().get(group1).get
assertEquals(expectedOperations, group1Description.authorizedOperations())
val group2Description = describeConsumerGroupsResult.describedGroups().get(group2).get
assertEquals(Set(AclOperation.DESCRIBE), group2Description.authorizedOperations().asScala.toSet)
val group3Description = describeConsumerGroupsResult.describedGroups().get(group3).get
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE), group3Description.authorizedOperations().asScala.toSet)
}
}

8
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -24,10 +24,9 @@ import kafka.security.auth._ @@ -24,10 +24,9 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ElectPreferredLeadersRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
@ -268,7 +267,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -268,7 +267,7 @@ class RequestQuotaTest extends BaseRequestTest {
new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
case ApiKeys.DESCRIBE_GROUPS =>
new DescribeGroupsRequest.Builder(List("test-group").asJava)
new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))
case ApiKeys.LIST_GROUPS =>
new ListGroupsRequest.Builder()
@ -443,7 +442,8 @@ class RequestQuotaTest extends BaseRequestTest { @@ -443,7 +442,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_GROUPS =>
new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS =>

Loading…
Cancel
Save