From 14d854936e1d2fed2e69a7c6367becf360f88833 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 25 Jun 2019 15:01:11 +0200 Subject: [PATCH] KAFKA-8390: Use automatic RPC generation in CreateDelegationToken (#6828) Reviewers: Manikumar Reddy --- .../kafka/clients/admin/KafkaAdminClient.java | 24 ++- .../apache/kafka/common/protocol/ApiKeys.java | 6 +- .../common/requests/AbstractResponse.java | 2 +- .../CreateDelegationTokenRequest.java | 94 ++-------- .../CreateDelegationTokenResponse.java | 164 +++++------------- .../common/requests/RequestResponseTest.java | 30 +++- .../main/scala/kafka/server/KafkaApis.scala | 15 +- .../unit/kafka/server/RequestQuotaTest.scala | 11 +- 8 files changed, 122 insertions(+), 224 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e01ecf3c22f..3fde51dea1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -61,6 +61,9 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; @@ -138,6 +141,7 @@ import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.RenewDelegationTokenRequest; import org.apache.kafka.common.requests.RenewDelegationTokenResponse; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.token.delegation.DelegationToken; import org.apache.kafka.common.security.token.delegation.TokenInformation; import org.apache.kafka.common.utils.AppInfoParser; @@ -2389,12 +2393,21 @@ public class KafkaAdminClient extends AdminClient { public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) { final KafkaFutureImpl delegationTokenFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); + List renewers = new ArrayList<>(); + for (KafkaPrincipal principal : options.renewers()) { + renewers.add(new CreatableRenewers() + .setPrincipalName(principal.getName()) + .setPrincipalType(principal.getPrincipalType())); + } runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs()); + AbstractRequest.Builder createRequest(int timeoutMs) { + return new CreateDelegationTokenRequest.Builder( + new CreateDelegationTokenRequestData() + .setRenewers(renewers) + .setMaxLifetimeMs(options.maxlifeTimeMs())); } @Override @@ -2403,9 +2416,10 @@ public class KafkaAdminClient extends AdminClient { if (response.hasError()) { delegationTokenFuture.completeExceptionally(response.error().exception()); } else { - TokenInformation tokenInfo = new TokenInformation(response.tokenId(), response.owner(), - options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp()); - DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes()); + CreateDelegationTokenResponseData data = response.data(); + TokenInformation tokenInfo = new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()), + options.renewers(), data.issueTimestampMs(), data.maxTimestampMs(), data.expiryTimestampMs()); + DelegationToken token = new DelegationToken(tokenInfo, data.hmac()); delegationTokenFuture.complete(token); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 849f268465b..257393b08b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.message.ControlledShutdownRequestData; import org.apache.kafka.common.message.ControlledShutdownResponseData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.DeleteTopicsRequestData; @@ -65,8 +67,6 @@ import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CreateAclsRequest; import org.apache.kafka.common.requests.CreateAclsResponse; -import org.apache.kafka.common.requests.CreateDelegationTokenRequest; -import org.apache.kafka.common.requests.CreateDelegationTokenResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.DeleteAclsRequest; @@ -185,7 +185,7 @@ public enum ApiKeys { SaslAuthenticateResponseData.SCHEMAS), CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(), CreatePartitionsResponse.schemaVersions()), - CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()), + CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS), RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()), EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()), DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 6d07431057e..8a7be33fbe1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -147,7 +147,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case CREATE_PARTITIONS: return new CreatePartitionsResponse(struct); case CREATE_DELEGATION_TOKEN: - return new CreateDelegationTokenResponse(struct); + return new CreateDelegationTokenResponse(struct, version); case RENEW_DELEGATION_TOKEN: return new RenewDelegationTokenResponse(struct); case EXPIRE_DELEGATION_TOKEN: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java index 3277f100382..61b404a7e06 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java @@ -16,126 +16,62 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME; public class CreateDelegationTokenRequest extends AbstractRequest { - private static final String RENEWERS_KEY_NAME = "renewers"; - private static final String MAX_LIFE_TIME_KEY_NAME = "max_life_time"; - - private static final Schema TOKEN_CREATE_REQUEST_V0 = new Schema( - new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)), - "An array of token renewers. Renewer is an Kafka PrincipalType and name string," + - " who is allowed to renew this token before the max lifetime expires."), - new Field(MAX_LIFE_TIME_KEY_NAME, INT64, - "Max lifetime period for token in milli seconds. if value is -1, then max lifetime" + - " will default to a server side config value.")); - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema TOKEN_CREATE_REQUEST_V1 = TOKEN_CREATE_REQUEST_V0; + private final CreateDelegationTokenRequestData data; - private final List renewers; - private final long maxLifeTime; - - private CreateDelegationTokenRequest(short version, List renewers, long maxLifeTime) { + private CreateDelegationTokenRequest(CreateDelegationTokenRequestData data, short version) { super(ApiKeys.CREATE_DELEGATION_TOKEN, version); - this.maxLifeTime = maxLifeTime; - this.renewers = renewers; + this.data = data; } public CreateDelegationTokenRequest(Struct struct, short version) { super(ApiKeys.CREATE_DELEGATION_TOKEN, version); - maxLifeTime = struct.getLong(MAX_LIFE_TIME_KEY_NAME); - Object[] renewerArray = struct.getArray(RENEWERS_KEY_NAME); - renewers = new ArrayList<>(); - if (renewerArray != null) { - for (Object renewerObj : renewerArray) { - Struct renewerObjStruct = (Struct) renewerObj; - String principalType = renewerObjStruct.get(PRINCIPAL_TYPE); - String principalName = renewerObjStruct.get(PRINCIPAL_NAME); - renewers.add(new KafkaPrincipal(principalType, principalName)); - } - } + this.data = new CreateDelegationTokenRequestData(struct, version); } public static CreateDelegationTokenRequest parse(ByteBuffer buffer, short version) { return new CreateDelegationTokenRequest(ApiKeys.CREATE_DELEGATION_TOKEN.parseRequest(version, buffer), version); } - public static Schema[] schemaVersions() { - return new Schema[]{TOKEN_CREATE_REQUEST_V0, TOKEN_CREATE_REQUEST_V1}; - } - @Override protected Struct toStruct() { - short version = version(); - Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.requestSchema(version)); - Object[] renewersArray = new Object[renewers.size()]; - - int i = 0; - for (KafkaPrincipal principal: renewers) { - Struct renewerStruct = struct.instance(RENEWERS_KEY_NAME); - renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType()); - renewerStruct.set(PRINCIPAL_NAME, principal.getName()); - renewersArray[i++] = renewerStruct; - } + return data.toStruct(version()); + } - struct.set(RENEWERS_KEY_NAME, renewersArray); - struct.set(MAX_LIFE_TIME_KEY_NAME, maxLifeTime); - return struct; + public CreateDelegationTokenRequestData data() { + return data; } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new CreateDelegationTokenResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS); - } - - public List renewers() { - return renewers; - } - - public long maxLifeTime() { - return maxLifeTime; + return CreateDelegationTokenResponse.prepareResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS); } public static class Builder extends AbstractRequest.Builder { - private final List renewers; - private final long maxLifeTime; + private final CreateDelegationTokenRequestData data; - public Builder(List renewers, long maxLifeTime) { + public Builder(CreateDelegationTokenRequestData data) { super(ApiKeys.CREATE_DELEGATION_TOKEN); - this.renewers = renewers; - this.maxLifeTime = maxLifeTime; + this.data = data; } @Override public CreateDelegationTokenRequest build(short version) { - return new CreateDelegationTokenRequest(version, renewers, maxLifeTime); + return new CreateDelegationTokenRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type: CreateDelegationTokenRequest"). - append(", renewers=").append(renewers). - append(", maxLifeTime=").append(maxLifeTime). - append(")"); - return bld.toString(); + return data.toString(); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java index 8bb6631140b..04db17ac7a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java @@ -16,160 +16,82 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME; -import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.types.Type.BYTES; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; - public class CreateDelegationTokenResponse extends AbstractResponse { - private static final String OWNER_KEY_NAME = "owner"; - private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp"; - private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp"; - private static final String MAX_TIMESTAMP_NAME = "max_timestamp"; - private static final String TOKEN_ID_KEY_NAME = "token_id"; - private static final String HMAC_KEY_NAME = "hmac"; - - private final Errors error; - private final long issueTimestamp; - private final long expiryTimestamp; - private final long maxTimestamp; - private final String tokenId; - private final ByteBuffer hmac; - private final int throttleTimeMs; - private KafkaPrincipal owner; - - private static final Schema TOKEN_CREATE_RESPONSE_V0 = new Schema( - ERROR_CODE, - new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."), - new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."), - new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."), - new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."), - new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."), - new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token."), - THROTTLE_TIME_MS); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema TOKEN_CREATE_RESPONSE_V1 = TOKEN_CREATE_RESPONSE_V0; - - public CreateDelegationTokenResponse(int throttleTimeMs, - Errors error, - KafkaPrincipal owner, - long issueTimestamp, - long expiryTimestamp, - long maxTimestamp, - String tokenId, - ByteBuffer hmac) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; - this.owner = owner; - this.issueTimestamp = issueTimestamp; - this.expiryTimestamp = expiryTimestamp; - this.maxTimestamp = maxTimestamp; - this.tokenId = tokenId; - this.hmac = hmac; - } + private final CreateDelegationTokenResponseData data; - public CreateDelegationTokenResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) { - this(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {})); + public CreateDelegationTokenResponse(CreateDelegationTokenResponseData data) { + this.data = data; } - public CreateDelegationTokenResponse(Struct struct) { - error = Errors.forCode(struct.get(ERROR_CODE)); - Struct ownerStruct = (Struct) struct.get(OWNER_KEY_NAME); - String principalType = ownerStruct.get(PRINCIPAL_TYPE); - String principalName = ownerStruct.get(PRINCIPAL_NAME); - owner = new KafkaPrincipal(principalType, principalName); - issueTimestamp = struct.getLong(ISSUE_TIMESTAMP_KEY_NAME); - expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_NAME); - maxTimestamp = struct.getLong(MAX_TIMESTAMP_NAME); - tokenId = struct.getString(TOKEN_ID_KEY_NAME); - hmac = struct.getBytes(HMAC_KEY_NAME); - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + public CreateDelegationTokenResponse(Struct struct, short version) { + this.data = new CreateDelegationTokenResponseData(struct, version); } public static CreateDelegationTokenResponse parse(ByteBuffer buffer, short version) { - return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer)); - } - - public static Schema[] schemaVersions() { - return new Schema[] {TOKEN_CREATE_RESPONSE_V0, TOKEN_CREATE_RESPONSE_V1}; - } - - @Override - public Map errorCounts() { - return errorCounts(error); - } - - @Override - protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version)); - struct.set(ERROR_CODE, error.code()); - Struct ownerStruct = struct.instance(OWNER_KEY_NAME); - ownerStruct.set(PRINCIPAL_TYPE, owner.getPrincipalType()); - ownerStruct.set(PRINCIPAL_NAME, owner.getName()); - struct.set(OWNER_KEY_NAME, ownerStruct); - struct.set(ISSUE_TIMESTAMP_KEY_NAME, issueTimestamp); - struct.set(EXPIRY_TIMESTAMP_NAME, expiryTimestamp); - struct.set(MAX_TIMESTAMP_NAME, maxTimestamp); - struct.set(TOKEN_ID_KEY_NAME, tokenId); - struct.set(HMAC_KEY_NAME, hmac); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - return struct; + return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version); } - public Errors error() { - return error; - } - - public KafkaPrincipal owner() { - return owner; + public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs, + Errors error, + KafkaPrincipal owner, + long issueTimestamp, + long expiryTimestamp, + long maxTimestamp, + String tokenId, + ByteBuffer hmac) { + CreateDelegationTokenResponseData data = new CreateDelegationTokenResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(error.code()) + .setPrincipalType(owner.getPrincipalType()) + .setPrincipalName(owner.getName()) + .setIssueTimestampMs(issueTimestamp) + .setExpiryTimestampMs(expiryTimestamp) + .setMaxTimestampMs(maxTimestamp) + .setTokenId(tokenId) + .setHmac(hmac.array()); + return new CreateDelegationTokenResponse(data); } - public long issueTimestamp() { - return issueTimestamp; + public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) { + return prepareResponse(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {})); } - public long expiryTimestamp() { - return expiryTimestamp; + public CreateDelegationTokenResponseData data() { + return data; } - public long maxTimestamp() { - return maxTimestamp; - } - - public String tokenId() { - return tokenId; + @Override + public Map errorCounts() { + return Collections.singletonMap(error(), 1); } - public byte[] hmacBytes() { - byte[] byteArray = new byte[hmac.remaining()]; - hmac.get(byteArray); - return byteArray; + @Override + protected Struct toStruct(short version) { + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); + } + + public Errors error() { + return Errors.forCode(data.errorCode()); } public boolean hasError() { - return this.error != Errors.NONE; + return error() != Errors.NONE; } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 11d85d6d7cf..1fb088056ef 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -37,6 +37,9 @@ import org.apache.kafka.common.message.ControlledShutdownRequestData; import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition; import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection; import org.apache.kafka.common.message.ControlledShutdownResponseData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig; @@ -1484,15 +1487,30 @@ public class RequestResponseTest { } private CreateDelegationTokenRequest createCreateTokenRequest() { - List renewers = new ArrayList<>(); - renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1")); - renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2")); - return new CreateDelegationTokenRequest.Builder(renewers, System.currentTimeMillis()).build(); + List renewers = new ArrayList<>(); + renewers.add(new CreatableRenewers() + .setPrincipalType("User") + .setPrincipalName("user1")); + renewers.add(new CreatableRenewers() + .setPrincipalType("User") + .setPrincipalName("user2")); + return new CreateDelegationTokenRequest.Builder(new CreateDelegationTokenRequestData() + .setRenewers(renewers) + .setMaxLifetimeMs(System.currentTimeMillis())).build(); } private CreateDelegationTokenResponse createCreateTokenResponse() { - return new CreateDelegationTokenResponse(20, Errors.NONE, SecurityUtils.parseKafkaPrincipal("User:user1"), System.currentTimeMillis(), - System.currentTimeMillis(), System.currentTimeMillis(), "token1", ByteBuffer.wrap("test".getBytes())); + CreateDelegationTokenResponseData data = new CreateDelegationTokenResponseData() + .setThrottleTimeMs(20) + .setErrorCode(Errors.NONE.code()) + .setPrincipalType("User") + .setPrincipalName("user1") + .setIssueTimestampMs(System.currentTimeMillis()) + .setExpiryTimestampMs(System.currentTimeMillis()) + .setMaxTimestampMs(System.currentTimeMillis()) + .setTokenId("token1") + .setHmac("test".getBytes()); + return new CreateDelegationTokenResponse(data); } private RenewDelegationTokenRequest createRenewTokenRequest() { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fa45cf5308a..e1000d019ff 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2337,25 +2337,26 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending create token response for correlation id %d to client %s." .format(request.header.correlationId, request.header.clientId)) sendResponseMaybeThrottle(request, requestThrottleMs => - new CreateDelegationTokenResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp, + CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp, createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId, ByteBuffer.wrap(createResult.hmac))) } if (!allowTokenRequests(request)) sendResponseMaybeThrottle(request, requestThrottleMs => - new CreateDelegationTokenResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal)) + CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal)) else { - val renewerList = createTokenRequest.renewers().asScala.toList + val renewerList = createTokenRequest.data.renewers.asScala.toList.map(entry => + new KafkaPrincipal(entry.principalType, entry.principalName)) - if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) { + if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) { sendResponseMaybeThrottle(request, requestThrottleMs => - new CreateDelegationTokenResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal)) + CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal)) } else { tokenManager.createToken( request.session.principal, - createTokenRequest.renewers().asScala.toList, - createTokenRequest.maxLifeTime(), + renewerList, + createTokenRequest.data.maxLifetimeMs, sendResponseCallback ) } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index c0d9b440e23..493b3e36b8e 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} +import org.apache.kafka.common.message.CreateDelegationTokenRequestData import org.apache.kafka.common.message.DeleteTopicsRequestData import org.apache.kafka.common.message.DescribeGroupsRequestData import org.apache.kafka.common.message.FindCoordinatorRequestData @@ -429,7 +430,13 @@ class RequestQuotaTest extends BaseRequestTest { ) case ApiKeys.CREATE_DELEGATION_TOKEN => - new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000) + new CreateDelegationTokenRequest.Builder( + new CreateDelegationTokenRequestData() + .setRenewers(Collections.singletonList(new CreateDelegationTokenRequestData.CreatableRenewers() + .setPrincipalType("User") + .setPrincipalName("test"))) + .setMaxLifetimeMs(1000) + ) case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenRequest.Builder("".getBytes, 1000) @@ -548,7 +555,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.ALTER_REPLICA_LOG_DIRS => new AlterReplicaLogDirsResponse(response).throttleTimeMs case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs - case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response).throttleTimeMs + case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs