From b8605c9bc1331b876f9d134ebc8fcf9bfc684c8e Mon Sep 17 00:00:00 2001 From: Viktor Somogyi-Vass Date: Fri, 16 Aug 2019 23:05:54 +0530 Subject: [PATCH] KAFKA-8600: Use RPC generation for DescribeDelegationTokens protocol Refactors the DescribeDelegationToken to use the generated RPC classes. Author: Viktor Somogyi-Vass Author: Viktor Somogyi Reviewers: Mickael Maison , Manikumar Reddy Closes #7154 from viktorsomogyi/refactor-describe-dt --- .../apache/kafka/common/protocol/ApiKeys.java | 6 +- .../common/requests/AbstractResponse.java | 2 +- .../DescribeDelegationTokenRequest.java | 103 +++-------- .../DescribeDelegationTokenResponse.java | 175 +++++------------- .../main/scala/kafka/server/KafkaApis.scala | 5 +- .../unit/kafka/server/RequestQuotaTest.scala | 32 +--- 6 files changed, 82 insertions(+), 241 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 912e26bc99b..3b45e5882f4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DeleteGroupsRequestData; import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteTopicsRequestData; import org.apache.kafka.common.message.DeleteTopicsResponseData; +import org.apache.kafka.common.message.DescribeDelegationTokenRequestData; +import org.apache.kafka.common.message.DescribeDelegationTokenResponseData; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; @@ -91,8 +93,6 @@ import org.apache.kafka.common.requests.DescribeAclsRequest; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; -import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; -import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.EndTxnRequest; @@ -192,7 +192,7 @@ public enum ApiKeys { CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS), RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequestData.SCHEMAS, RenewDelegationTokenResponseData.SCHEMAS), EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS), - DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), + DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequestData.SCHEMAS, DescribeDelegationTokenResponseData.SCHEMAS), DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS), ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS, ElectLeadersResponseData.SCHEMAS), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 13d18a2d724..b84a2c03d1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -153,7 +153,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case EXPIRE_DELEGATION_TOKEN: return new ExpireDelegationTokenResponse(struct, version); case DESCRIBE_DELEGATION_TOKEN: - return new DescribeDelegationTokenResponse(struct); + return new DescribeDelegationTokenResponse(struct, version); case DELETE_GROUPS: return new DeleteGroupsResponse(struct, version); case ELECT_LEADERS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java index 21e14608c36..7db3209d621 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java @@ -16,122 +16,69 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.DescribeDelegationTokenRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; - -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE; +import java.util.stream.Collectors; public class DescribeDelegationTokenRequest extends AbstractRequest { - private static final String OWNER_KEY_NAME = "owners"; - - private final List owners; - - public static final Schema TOKEN_DESCRIBE_REQUEST_V0 = new Schema( - new Field(OWNER_KEY_NAME, ArrayOf.nullable(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)), "An array of token owners.")); - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - public static final Schema TOKEN_DESCRIBE_REQUEST_V1 = TOKEN_DESCRIBE_REQUEST_V0; + private final DescribeDelegationTokenRequestData data; public static class Builder extends AbstractRequest.Builder { - // describe token for the given list of owners, or null if we want to describe all tokens. - private final List owners; + private final DescribeDelegationTokenRequestData data; public Builder(List owners) { super(ApiKeys.DESCRIBE_DELEGATION_TOKEN); - this.owners = owners; + this.data = new DescribeDelegationTokenRequestData() + .setOwners(owners == null ? null : owners + .stream() + .map(owner -> new DescribeDelegationTokenRequestData.DescribeDelegationTokenOwner() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType())) + .collect(Collectors.toList())); } @Override public DescribeDelegationTokenRequest build(short version) { - return new DescribeDelegationTokenRequest(version, owners); + return new DescribeDelegationTokenRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type: DescribeDelegationTokenRequest"). - append(", owners=").append(owners). - append(")"); - return bld.toString(); + return data.toString(); } } - private DescribeDelegationTokenRequest(short version, List owners) { + public DescribeDelegationTokenRequest(Struct struct, short version) { super(ApiKeys.DESCRIBE_DELEGATION_TOKEN, version); - this.owners = owners; + this.data = new DescribeDelegationTokenRequestData(struct, version); } - public DescribeDelegationTokenRequest(Struct struct, short versionId) { - super(ApiKeys.DESCRIBE_DELEGATION_TOKEN, versionId); - - Object[] ownerArray = struct.getArray(OWNER_KEY_NAME); - - if (ownerArray != null) { - owners = new ArrayList<>(); - for (Object ownerObj : ownerArray) { - Struct ownerObjStruct = (Struct) ownerObj; - String principalType = ownerObjStruct.get(PRINCIPAL_TYPE); - String principalName = ownerObjStruct.get(PRINCIPAL_NAME); - owners.add(new KafkaPrincipal(principalType, principalName)); - } - } else - owners = null; - } - - public static Schema[] schemaVersions() { - return new Schema[]{TOKEN_DESCRIBE_REQUEST_V0, TOKEN_DESCRIBE_REQUEST_V1}; + public DescribeDelegationTokenRequest(DescribeDelegationTokenRequestData data, short version) { + super(ApiKeys.DESCRIBE_DELEGATION_TOKEN, version); + this.data = data; } @Override protected Struct toStruct() { - short version = version(); - Struct struct = new Struct(ApiKeys.DESCRIBE_DELEGATION_TOKEN.requestSchema(version)); - - if (owners == null) { - struct.set(OWNER_KEY_NAME, null); - } else { - Object[] ownersArray = new Object[owners.size()]; - - int i = 0; - for (KafkaPrincipal principal: owners) { - Struct ownerStruct = struct.instance(OWNER_KEY_NAME); - ownerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType()); - ownerStruct.set(PRINCIPAL_NAME, principal.getName()); - ownersArray[i++] = ownerStruct; - } - - struct.set(OWNER_KEY_NAME, ownersArray); - } - - return struct; - } - - @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new DescribeDelegationTokenResponse(throttleTimeMs, Errors.forException(e)); + return data.toStruct(version()); } - public List owners() { - return owners; + public DescribeDelegationTokenRequestData data() { + return data; } public boolean ownersListEmpty() { - return owners != null && owners.isEmpty(); + return data.owners() != null && data.owners().isEmpty(); } - public static DescribeDelegationTokenRequest parse(ByteBuffer buffer, short version) { - return new DescribeDelegationTokenRequest(ApiKeys.DESCRIBE_DELEGATION_TOKEN.parseRequest(version, buffer), version); + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new DescribeDelegationTokenResponse(throttleTimeMs, Errors.forException(e)); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java index 64ed27bfd00..91e7db0fccb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.DescribeDelegationTokenResponseData; +import org.apache.kafka.common.message.DescribeDelegationTokenResponseData.DescribedDelegationToken; +import org.apache.kafka.common.message.DescribeDelegationTokenResponseData.DescribedDelegationTokenRenewer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.token.delegation.DelegationToken; @@ -30,169 +30,82 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; - -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.types.Type.BYTES; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import java.util.stream.Collectors; public class DescribeDelegationTokenResponse extends AbstractResponse { - private static final String TOKEN_DETAILS_KEY_NAME = "token_details"; - private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp"; - private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp"; - private static final String MAX_TIMESTAMP_NAME = "max_timestamp"; - private static final String TOKEN_ID_KEY_NAME = "token_id"; - private static final String HMAC_KEY_NAME = "hmac"; - private static final String OWNER_KEY_NAME = "owner"; - private static final String RENEWERS_KEY_NAME = "renewers"; - - private final Errors error; - private final List tokens; - private final int throttleTimeMs; - - private static final Schema TOKEN_DETAILS_V0 = new Schema( - new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."), - new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."), - new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."), - new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."), - new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."), - new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."), - new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)), - "An array of token renewers. Renewer is an Kafka PrincipalType and name string," + - " who is allowed to renew this token before the max lifetime expires.")); - - private static final Schema TOKEN_DESCRIBE_RESPONSE_V0 = new Schema( - ERROR_CODE, - new Field(TOKEN_DETAILS_KEY_NAME, new ArrayOf(TOKEN_DETAILS_V0)), - THROTTLE_TIME_MS); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema TOKEN_DESCRIBE_RESPONSE_V1 = TOKEN_DESCRIBE_RESPONSE_V0; + private final DescribeDelegationTokenResponseData data; public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error, List tokens) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; - this.tokens = tokens; + List describedDelegationTokenList = tokens + .stream() + .map(dt -> new DescribedDelegationToken() + .setTokenId(dt.tokenInfo().tokenId()) + .setPrincipalType(dt.tokenInfo().owner().getPrincipalType()) + .setPrincipalName(dt.tokenInfo().owner().getName()) + .setIssueTimestamp(dt.tokenInfo().issueTimestamp()) + .setMaxTimestamp(dt.tokenInfo().maxTimestamp()) + .setExpiryTimestamp(dt.tokenInfo().expiryTimestamp()) + .setHmac(dt.hmac()) + .setRenewers(dt.tokenInfo().renewers() + .stream() + .map(r -> new DescribedDelegationTokenRenewer().setPrincipalName(r.getName()).setPrincipalType(r.getPrincipalType())) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + + this.data = new DescribeDelegationTokenResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(error.code()) + .setTokens(describedDelegationTokenList); } public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error) { this(throttleTimeMs, error, new ArrayList<>()); } - public DescribeDelegationTokenResponse(Struct struct) { - Object[] requestStructs = struct.getArray(TOKEN_DETAILS_KEY_NAME); - List tokens = new ArrayList<>(); - - for (Object requestStructObj : requestStructs) { - Struct singleRequestStruct = (Struct) requestStructObj; - - Struct ownerStruct = (Struct) singleRequestStruct.get(OWNER_KEY_NAME); - KafkaPrincipal owner = new KafkaPrincipal(ownerStruct.get(PRINCIPAL_TYPE), ownerStruct.get(PRINCIPAL_NAME)); - long issueTimestamp = singleRequestStruct.getLong(ISSUE_TIMESTAMP_KEY_NAME); - long expiryTimestamp = singleRequestStruct.getLong(EXPIRY_TIMESTAMP_NAME); - long maxTimestamp = singleRequestStruct.getLong(MAX_TIMESTAMP_NAME); - String tokenId = singleRequestStruct.getString(TOKEN_ID_KEY_NAME); - ByteBuffer hmac = singleRequestStruct.getBytes(HMAC_KEY_NAME); - - Object[] renewerArray = singleRequestStruct.getArray(RENEWERS_KEY_NAME); - List renewers = new ArrayList<>(); - if (renewerArray != null) { - for (Object renewerObj : renewerArray) { - Struct renewerObjStruct = (Struct) renewerObj; - String principalType = renewerObjStruct.get(PRINCIPAL_TYPE); - String principalName = renewerObjStruct.get(PRINCIPAL_NAME); - renewers.add(new KafkaPrincipal(principalType, principalName)); - } - } - - TokenInformation tokenInfo = new TokenInformation(tokenId, owner, renewers, issueTimestamp, maxTimestamp, expiryTimestamp); - - byte[] hmacBytes = new byte[hmac.remaining()]; - hmac.get(hmacBytes); - - DelegationToken tokenDetails = new DelegationToken(tokenInfo, hmacBytes); - tokens.add(tokenDetails); - } - - this.tokens = tokens; - error = Errors.forCode(struct.get(ERROR_CODE)); - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + public DescribeDelegationTokenResponse(Struct struct, short version) { + this.data = new DescribeDelegationTokenResponseData(struct, version); } public static DescribeDelegationTokenResponse parse(ByteBuffer buffer, short version) { - return new DescribeDelegationTokenResponse(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version).read(buffer)); + return new DescribeDelegationTokenResponse(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version); } @Override public Map errorCounts() { - return errorCounts(error); + return errorCounts(error()); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version)); - List tokenDetailsStructs = new ArrayList<>(tokens.size()); - - struct.set(ERROR_CODE, error.code()); - - for (DelegationToken token : tokens) { - TokenInformation tokenInfo = token.tokenInfo(); - Struct singleRequestStruct = struct.instance(TOKEN_DETAILS_KEY_NAME); - Struct ownerStruct = singleRequestStruct.instance(OWNER_KEY_NAME); - ownerStruct.set(PRINCIPAL_TYPE, tokenInfo.owner().getPrincipalType()); - ownerStruct.set(PRINCIPAL_NAME, tokenInfo.owner().getName()); - singleRequestStruct.set(OWNER_KEY_NAME, ownerStruct); - singleRequestStruct.set(ISSUE_TIMESTAMP_KEY_NAME, tokenInfo.issueTimestamp()); - singleRequestStruct.set(EXPIRY_TIMESTAMP_NAME, tokenInfo.expiryTimestamp()); - singleRequestStruct.set(MAX_TIMESTAMP_NAME, tokenInfo.maxTimestamp()); - singleRequestStruct.set(TOKEN_ID_KEY_NAME, tokenInfo.tokenId()); - singleRequestStruct.set(HMAC_KEY_NAME, ByteBuffer.wrap(token.hmac())); - - Object[] renewersArray = new Object[tokenInfo.renewers().size()]; - - int i = 0; - for (KafkaPrincipal principal: tokenInfo.renewers()) { - Struct renewerStruct = singleRequestStruct.instance(RENEWERS_KEY_NAME); - renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType()); - renewerStruct.set(PRINCIPAL_NAME, principal.getName()); - renewersArray[i++] = renewerStruct; - } - - singleRequestStruct.set(RENEWERS_KEY_NAME, renewersArray); - - tokenDetailsStructs.add(singleRequestStruct); - } - struct.set(TOKEN_DETAILS_KEY_NAME, tokenDetailsStructs.toArray()); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - - return struct; - } - - public static Schema[] schemaVersions() { - return new Schema[]{TOKEN_DESCRIBE_RESPONSE_V0, TOKEN_DESCRIBE_RESPONSE_V1}; + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } public Errors error() { - return error; + return Errors.forCode(data.errorCode()); } public List tokens() { - return tokens; + return data.tokens() + .stream() + .map(ddt -> new DelegationToken(new TokenInformation( + ddt.tokenId(), + new KafkaPrincipal(ddt.principalType(), ddt.principalName()), + ddt.renewers() + .stream() + .map(ddtr -> new KafkaPrincipal(ddtr.principalType(), ddtr.principalName())) + .collect(Collectors.toList()), ddt.issueTimestamp(), ddt.maxTimestamp(), ddt.expiryTimestamp()), + ddt.hmac())) + .collect(Collectors.toList()); } public boolean hasError() { - return this.error != Errors.NONE; + return error() != Errors.NONE; } @Override diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b71e4b0876c..4df57a95aa4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2531,7 +2531,10 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(Errors.NONE, List()) } else { - val owners = if (describeTokenRequest.owners == null) None else Some(describeTokenRequest.owners.asScala.toList) + val owners = if (describeTokenRequest.data.owners == null) + None + else + Some(describeTokenRequest.data.owners.asScala.map(p => new KafkaPrincipal(p.principalType(), p.principalName())).toList) def authorizeToken(tokenId: String) = authorize(request.session, Describe, Resource(kafka.security.auth.DelegationToken, tokenId, LITERAL)) def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken) val tokens = tokenManager.getTokens(eligible) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index d5c5d7faebb..d2ad89748f5 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -21,34 +21,13 @@ import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils -import org.apache.kafka.common.ElectionType -import org.apache.kafka.common.Node -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.{ElectionType, Node, TopicPartition} +import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.message.ControlledShutdownRequestData -import org.apache.kafka.common.message.CreateDelegationTokenRequestData -import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} -import org.apache.kafka.common.message.DeleteGroupsRequestData -import org.apache.kafka.common.message.DeleteTopicsRequestData -import org.apache.kafka.common.message.DescribeGroupsRequestData -import org.apache.kafka.common.message.ExpireDelegationTokenRequestData -import org.apache.kafka.common.message.FindCoordinatorRequestData -import org.apache.kafka.common.message.HeartbeatRequestData -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData -import org.apache.kafka.common.message.InitProducerIdRequestData -import org.apache.kafka.common.message.JoinGroupRequestData import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity -import org.apache.kafka.common.message.ListGroupsRequestData -import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData -import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData -import org.apache.kafka.common.message.OffsetCommitRequestData -import org.apache.kafka.common.message.RenewDelegationTokenRequestData -import org.apache.kafka.common.message.SaslAuthenticateRequestData -import org.apache.kafka.common.message.SaslHandshakeRequestData -import org.apache.kafka.common.message.SyncGroupRequestData +import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys @@ -58,8 +37,7 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol} -import org.apache.kafka.common.utils.Sanitizer -import org.apache.kafka.common.utils.SecurityUtils +import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -580,7 +558,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs - case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs + case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response, ApiKeys.DESCRIBE_DELEGATION_TOKEN.latestVersion).throttleTimeMs case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response, ApiKeys.RENEW_DELEGATION_TOKEN.latestVersion).throttleTimeMs case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response, ApiKeys.EXPIRE_DELEGATION_TOKEN.latestVersion).throttleTimeMs case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs