Browse Source

KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)

Reviewers: Jun Rao <junrao@gmail.com>
pull/4454/head
Manikumar Reddy O 7 years ago committed by Jun Rao
parent
commit
47918f2d79
  1. 1
      build.gradle
  2. 2
      checkstyle/suppressions.xml
  3. 154
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
  4. 53
      clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
  5. 43
      clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
  6. 48
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
  7. 45
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
  8. 39
      clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
  9. 42
      clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
  10. 137
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  11. 39
      clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
  12. 42
      clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
  13. 2
      clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
  14. 2
      clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
  15. 4
      clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
  16. 4
      clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
  17. 4
      clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
  18. 4
      clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
  19. 4
      clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
  20. 2
      clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
  21. 4
      clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
  22. 11
      clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
  23. 6
      clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
  24. 4
      clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
  25. 2
      clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
  26. 20
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  27. 2
      clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
  28. 4
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  29. 2
      clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
  30. 29
      core/src/main/scala/kafka/admin/AdminClient.scala
  31. 88
      core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
  32. 2
      core/src/main/scala/kafka/security/CredentialProvider.scala
  33. 3
      core/src/main/scala/kafka/server/DelegationTokenManager.scala
  34. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  35. 8
      core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
  36. 147
      core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
  37. 3
      core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
  38. 27
      core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
  39. 102
      core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
  40. 32
      core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
  41. 4
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

1
build.gradle

@ -858,6 +858,7 @@ project(':clients') { @@ -858,6 +858,7 @@ project(':clients') {
include "**/org/apache/kafka/common/config/*"
include "**/org/apache/kafka/common/security/auth/*"
include "**/org/apache/kafka/server/policy/*"
include "**/org/apache/kafka/common/security/token/delegation/*"
}
}

2
checkstyle/suppressions.xml

@ -10,7 +10,7 @@ @@ -10,7 +10,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="ClassFanOutComplexity"

154
clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java

@ -535,4 +535,158 @@ public abstract class AdminClient implements AutoCloseable { @@ -535,4 +535,158 @@ public abstract class AdminClient implements AutoCloseable {
*/
public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
DeleteRecordsOptions options);
/**
* <p>Create a Delegation Token.</p>
*
* <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
* See the overload for more details.</p>
*
* @return The CreateDelegationTokenResult.
*/
public CreateDelegationTokenResult createDelegationToken() {
return createDelegationToken(new CreateDelegationTokenOptions());
}
/**
* <p>Create a Delegation Token.</p>
*
* <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
* {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
* If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
* if the renewers principal type is not supported.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
* if the delegation token feature is disabled.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
* </ul>
*
* @param options The options to use when creating delegation token.
* @return The DeleteRecordsResult.
*/
public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
/**
* <p>Renew a Delegation Token.</p>
*
* <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
* See the overload for more details.</p>
*
*
* @param hmac HMAC of the Delegation token
* @return The RenewDelegationTokenResult.
*/
public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
}
/**
* <p> Renew a Delegation Token.</p>
*
* <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
* {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
* If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
* if the delegation token feature is disabled.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
* if the delegation token is not found on server.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
* if the authenticated user is not owner/renewer of the token.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
* if the delegation token is expired.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
* </ul>
*
* @param hmac HMAC of the Delegation token
* @param options The options to use when renewing delegation token.
* @return The RenewDelegationTokenResult.
*/
public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
/**
* <p>Expire a Delegation Token.</p>
*
* <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
* This will expire the token immediately. See the overload for more details.</p>
*
* @param hmac HMAC of the Delegation token
* @return The ExpireDelegationTokenResult.
*/
public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
}
/**
* <p>Expire a Delegation Token.</p>
*
* <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
* {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
* If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
* if the delegation token feature is disabled.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
* if the delegation token is not found on server.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
* if the authenticated user is not owner/renewer of the requested token.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
* if the delegation token is expired.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
* </ul>
*
* @param hmac HMAC of the Delegation token
* @param options The options to use when expiring delegation token.
* @return The ExpireDelegationTokenResult.
*/
public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
/**
*<p>Describe the Delegation Tokens.</p>
*
* <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
* This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
*
* @return The DescribeDelegationTokenResult.
*/
public DescribeDelegationTokenResult describeDelegationToken() {
return describeDelegationToken(new DescribeDelegationTokenOptions());
}
/**
* <p>Describe the Delegation Tokens.</p>
*
* <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
* {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
* If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
* <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
* if the delegation token feature is disabled.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
* </ul>
*
* @param options The options to use when describing delegation tokens.
* @return The DescribeDelegationTokenResult.
*/
public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
}

53
clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import java.util.LinkedList;
import java.util.List;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
/**
* Options for {@link AdminClient#createDelegationToken(CreateDelegationTokenOptions)}.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
private long maxLifeTimeMs = -1;
private List<KafkaPrincipal> renewers = new LinkedList<>();
public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers) {
this.renewers = renewers;
return this;
}
public List<KafkaPrincipal> renewers() {
return renewers;
}
public CreateDelegationTokenOptions maxlifeTimeMs(long maxLifeTimeMs) {
this.maxLifeTimeMs = maxLifeTimeMs;
return this;
}
public long maxlifeTimeMs() {
return maxLifeTimeMs;
}
}

43
clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
/**
* The result of the {@link KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class CreateDelegationTokenResult {
private final KafkaFuture<DelegationToken> delegationToken;
CreateDelegationTokenResult(KafkaFuture<DelegationToken> delegationToken) {
this.delegationToken = delegationToken;
}
/**
* Returns a future which yields a delegation token
*/
public KafkaFuture<DelegationToken> delegationToken() {
return delegationToken;
}
}

48
clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java

@ -0,0 +1,48 @@ @@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import java.util.List;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
/**
* Options for {@link AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
private List<KafkaPrincipal> owners;
/**
* if owners is null, all the user owned tokens and tokens where user have Describe permission
* will be returned.
* @param owners
* @return this instance
*/
public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
this.owners = owners;
return this;
}
public List<KafkaPrincipal> owners() {
return owners;
}
}

45
clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import java.util.List;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
/**
* The result of the {@link KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class DescribeDelegationTokenResult {
private final KafkaFuture<List<DelegationToken>> delegationTokens;
DescribeDelegationTokenResult(KafkaFuture<List<DelegationToken>> delegationTokens) {
this.delegationTokens = delegationTokens;
}
/**
* Returns a future which yields list of delegation tokens
*/
public KafkaFuture<List<DelegationToken>> delegationTokens() {
return delegationTokens;
}
}

39
clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java

@ -0,0 +1,39 @@ @@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link AdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
private long expiryTimePeriodMs = -1L;
public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs) {
this.expiryTimePeriodMs = expiryTimePeriodMs;
return this;
}
public long expiryTimePeriodMs() {
return expiryTimePeriodMs;
}
}

42
clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java

@ -0,0 +1,42 @@ @@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ExpireDelegationTokenResult {
private final KafkaFuture<Long> expiryTimestamp;
ExpireDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
this.expiryTimestamp = expiryTimestamp;
}
/**
* Returns a future which yields expiry timestamp
*/
public KafkaFuture<Long> expiryTimestamp() {
return expiryTimestamp;
}
}

137
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -69,6 +69,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest; @@ -69,6 +69,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
@ -85,12 +87,20 @@ import org.apache.kafka.common.requests.DescribeAclsRequest; @@ -85,12 +87,20 @@ import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.Resource;
import org.apache.kafka.common.requests.ResourceType;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
@ -2072,4 +2082,131 @@ public class KafkaAdminClient extends AdminClient { @@ -2072,4 +2082,131 @@ public class KafkaAdminClient extends AdminClient {
return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
}
@Override
public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
CreateDelegationTokenResponse response = (CreateDelegationTokenResponse) abstractResponse;
if (response.hasError()) {
delegationTokenFuture.completeExceptionally(response.error().exception());
} else {
TokenInformation tokenInfo = new TokenInformation(response.tokenId(), response.owner(),
options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
delegationTokenFuture.complete(token);
}
}
@Override
void handleFailure(Throwable throwable) {
delegationTokenFuture.completeExceptionally(throwable);
}
}, now);
return new CreateDelegationTokenResult(delegationTokenFuture);
}
@Override
public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) {
final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
RenewDelegationTokenResponse response = (RenewDelegationTokenResponse) abstractResponse;
if (response.hasError()) {
expiryTimeFuture.completeExceptionally(response.error().exception());
} else {
expiryTimeFuture.complete(response.expiryTimestamp());
}
}
@Override
void handleFailure(Throwable throwable) {
expiryTimeFuture.completeExceptionally(throwable);
}
}, now);
return new RenewDelegationTokenResult(expiryTimeFuture);
}
@Override
public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) {
final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
ExpireDelegationTokenResponse response = (ExpireDelegationTokenResponse) abstractResponse;
if (response.hasError()) {
expiryTimeFuture.completeExceptionally(response.error().exception());
} else {
expiryTimeFuture.complete(response.expiryTimestamp());
}
}
@Override
void handleFailure(Throwable throwable) {
expiryTimeFuture.completeExceptionally(throwable);
}
}, now);
return new ExpireDelegationTokenResult(expiryTimeFuture);
}
@Override
public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) {
final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new DescribeDelegationTokenRequest.Builder(options.owners());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
DescribeDelegationTokenResponse response = (DescribeDelegationTokenResponse) abstractResponse;
if (response.hasError()) {
tokensFuture.completeExceptionally(response.error().exception());
} else {
tokensFuture.complete(response.tokens());
}
}
@Override
void handleFailure(Throwable throwable) {
tokensFuture.completeExceptionally(throwable);
}
}, now);
return new DescribeDelegationTokenResult(tokensFuture);
}
}

39
clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java

@ -0,0 +1,39 @@ @@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link AdminClient#renewDelegationToken(byte[], RenewDelegationTokenOptions)}.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class RenewDelegationTokenOptions extends AbstractOptions<RenewDelegationTokenOptions> {
private long renewTimePeriodMs = -1;
public RenewDelegationTokenOptions renewTimePeriodMs(long renewTimePeriodMs) {
this.renewTimePeriodMs = renewTimePeriodMs;
return this;
}
public long renewTimePeriodMs() {
return renewTimePeriodMs;
}
}

42
clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java

@ -0,0 +1,42 @@ @@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class RenewDelegationTokenResult {
private final KafkaFuture<Long> expiryTimestamp;
RenewDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
this.expiryTimestamp = expiryTimestamp;
}
/**
* Returns a future which yields expiry timestamp
*/
public KafkaFuture<Long> expiryTimestamp() {
return expiryTimestamp;
}
}

2
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java

@ -26,7 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild @@ -26,7 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.apache.kafka.common.utils.Utils;
import java.util.Collections;

2
clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java

@ -41,7 +41,7 @@ import org.apache.kafka.common.security.scram.ScramCredential; @@ -41,7 +41,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internal.ScramMechanism;
import org.apache.kafka.common.security.scram.internal.ScramServerCallbackHandler;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.apache.kafka.common.utils.Java;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

4
clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java

@ -184,4 +184,8 @@ public class DescribeDelegationTokenResponse extends AbstractResponse { @@ -184,4 +184,8 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
public List<DelegationToken> tokens() {
return tokens;
}
public boolean hasError() {
return this.error != Errors.NONE;
}
}

4
clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java

@ -88,9 +88,9 @@ public class ExpireDelegationTokenRequest extends AbstractRequest { @@ -88,9 +88,9 @@ public class ExpireDelegationTokenRequest extends AbstractRequest {
private final ByteBuffer hmac;
private final long expiryTimePeriod;
public Builder(ByteBuffer hmac, long expiryTimePeriod) {
public Builder(byte[] hmac, long expiryTimePeriod) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
this.hmac = hmac;
this.hmac = ByteBuffer.wrap(hmac);
this.expiryTimePeriod = expiryTimePeriod;
}

4
clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java

@ -93,4 +93,8 @@ public class ExpireDelegationTokenResponse extends AbstractResponse { @@ -93,4 +93,8 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
public int throttleTimeMs() {
return throttleTimeMs;
}
public boolean hasError() {
return this.error != Errors.NONE;
}
}

4
clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java

@ -88,9 +88,9 @@ public class RenewDelegationTokenRequest extends AbstractRequest { @@ -88,9 +88,9 @@ public class RenewDelegationTokenRequest extends AbstractRequest {
private final ByteBuffer hmac;
private final long renewTimePeriod;
public Builder(ByteBuffer hmac, long renewTimePeriod) {
public Builder(byte[] hmac, long renewTimePeriod) {
super(ApiKeys.RENEW_DELEGATION_TOKEN);
this.hmac = hmac;
this.hmac = ByteBuffer.wrap(hmac);
this.renewTimePeriod = renewTimePeriod;
}

4
clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java

@ -93,4 +93,8 @@ public class RenewDelegationTokenResponse extends AbstractResponse { @@ -93,4 +93,8 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
public long expiryTimestamp() {
return expiryTimestamp;
}
public boolean hasError() {
return this.error != Errors.NONE;
}
}

2
clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java

@ -40,7 +40,7 @@ import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinal @@ -40,7 +40,7 @@ import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinal
import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

4
clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java

@ -29,8 +29,8 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; @@ -29,8 +29,8 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
public class ScramServerCallbackHandler implements AuthenticateCallbackHandler {

11
clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java

@ -16,11 +16,16 @@ @@ -16,11 +16,16 @@
*/
package org.apache.kafka.common.security.token.delegation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.utils.Base64;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* A class representing a delegation token.
*
*/
@InterfaceStability.Evolving
public class DelegationToken {
private TokenInformation tokenInformation;
private byte[] hmac;
@ -42,10 +47,6 @@ public class DelegationToken { @@ -42,10 +47,6 @@ public class DelegationToken {
return Base64.encoder().encodeToString(hmac);
}
public ByteBuffer hmacBuffer() {
return ByteBuffer.wrap(hmac);
}
@Override
public boolean equals(Object o) {
if (this == o) {

6
clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java

@ -16,11 +16,17 @@ @@ -16,11 +16,17 @@
*/
package org.apache.kafka.common.security.token.delegation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.ArrayList;
import java.util.Collection;
/**
* A class representing a delegation token details.
*
*/
@InterfaceStability.Evolving
public class TokenInformation {
private KafkaPrincipal owner;

4
clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java → clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java

@ -15,12 +15,14 @@ @@ -15,12 +15,14 @@
* limitations under the License.
*/
package org.apache.kafka.common.security.token.delegation;
package org.apache.kafka.common.security.token.delegation.internal;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internal.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import java.util.Collection;
import java.util.HashMap;

2
clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java → clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java

@ -14,7 +14,7 @@ @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.token.delegation;
package org.apache.kafka.common.security.token.delegation.internal;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;

20
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -276,6 +276,26 @@ public class MockAdminClient extends AdminClient { @@ -276,6 +276,26 @@ public class MockAdminClient extends AdminClient {
}
}
@Override
public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

2
clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java

@ -44,7 +44,7 @@ import java.util.Iterator; @@ -44,7 +44,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
/**
* Non-blocking EchoServer implementation that uses ChannelBuilder to create channels

4
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -1223,7 +1223,7 @@ public class RequestResponseTest { @@ -1223,7 +1223,7 @@ public class RequestResponseTest {
}
private RenewDelegationTokenRequest createRenewTokenRequest() {
return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
return new RenewDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
}
private RenewDelegationTokenResponse createRenewTokenResponse() {
@ -1231,7 +1231,7 @@ public class RequestResponseTest { @@ -1231,7 +1231,7 @@ public class RequestResponseTest {
}
private ExpireDelegationTokenRequest createExpireTokenRequest() {
return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
}
private ExpireDelegationTokenResponse createExpireTokenResponse() {

2
clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java

@ -23,7 +23,7 @@ import java.util.HashMap; @@ -23,7 +23,7 @@ import java.util.HashMap;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.junit.Before;
import org.junit.Test;

29
core/src/main/scala/kafka/admin/AdminClient.scala

@ -35,8 +35,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion @@ -35,8 +35,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
@ -342,33 +340,6 @@ class AdminClient(val time: Time, @@ -342,33 +340,6 @@ class AdminClient(val time: Time,
ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
}
def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = {
val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava,
response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
(response.error, new DelegationToken(tokenInfo, response.hmacBytes))
}
def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = {
val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
(response.error, response.expiryTimestamp)
}
def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = {
val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
(response.error, response.expiryTimestamp)
}
def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = {
val ownersList = if (owners == null) null else owners.asJava
val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList))
val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
(response.error, response.tokens().asScala.toList)
}
def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
def coordinatorLookup(group: String): Either[Node, Errors] = {

88
core/src/main/scala/kafka/admin/DelegationTokenCommand.scala

@ -17,12 +17,13 @@ @@ -17,12 +17,13 @@
package kafka.admin
import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util
import joptsimple._
import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging { @@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging {
}
}
def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
def createToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): DelegationToken = {
val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
response match {
case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
case (e, _) => throw new AdminOperationException(e.message)
}
val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals)
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
val token = createResult.delegationToken().get()
println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
token
}
def printToken(tokens: List[DelegationToken]): Unit = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
for (token <- tokens) {
val tokenInfo = token.tokenInfo
@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging { @@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging {
token.hmacAsBase64String,
tokenInfo.owner,
tokenInfo.renewersAsString,
tokenInfo.issueTimestamp,
tokenInfo.expiryTimestamp,
tokenInfo.maxTimestamp))
dateFormat.format(tokenInfo.issueTimestamp),
dateFormat.format(tokenInfo.expiryTimestamp),
dateFormat.format(tokenInfo.maxTimestamp)))
println()
}
}
private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = {
private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = {
if (opts.options.has(principalOptionSpec))
opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList
Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava)
else
List.empty[KafkaPrincipal]
None
}
def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
def renewToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs)
response match {
case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp))
case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
}
val renewResult = adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
val expiryTimeStamp = renewResult.expiryTimestamp().get()
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
expiryTimeStamp
}
def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
def expireToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs)
response match {
case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp))
case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
}
val expireResult = adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
val expiryTimeStamp = expireResult.expiryTimestamp().get()
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
expiryTimeStamp
}
def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
def describeToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
println("Calling describe token operation for owners :" + ownerPrincipals)
val response = adminClient.describeToken(ownerPrincipals)
response match {
case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens)
case (e, tokens) => throw new AdminOperationException(e.message)
}
if (ownerPrincipals.isEmpty)
println("Calling describe token operation for current user.")
else
println("Calling describe token operation for owners :" + ownerPrincipals.get)
val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull))
val tokens = describeResult.delegationTokens().get().asScala.toList
println("Total number of tokens : %s".format(tokens.size)); printToken(tokens)
tokens
}
private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = {
private def createAdminClient(opts: DelegationTokenCommandOptions): JAdminClient = {
val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
AdminClient.create(props)
JAdminClient.create(props)
}
class DelegationTokenCommandOptions(args: Array[String]) {
@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging { @@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging {
.withRequiredArg
.ofType(classOf[String])
val createOpt = parser.accepts("create", "Create a new delegation token.")
val renewOpt = parser.accepts("renew", "Renew delegation token.")
val expiryOpt = parser.accepts("expire", "Expire delegation token.")
val describeOpt = parser.accepts("describe", "describe delegation tokens.")
val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.")
val renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.")
val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.")
val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
" If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.")
val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
.withOptionalArg()

2
core/src/main/scala/kafka/security/CredentialProvider.scala

@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential @@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef._
import org.apache.kafka.common.security.scram.internal.{ScramCredentialUtils, ScramMechanism}
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {

3
core/src/main/scala/kafka/server/DelegationTokenManager.scala

@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors @@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internal.{ScramFormatter, ScramMechanism}
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
import scala.collection.JavaConverters._

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors @@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internal.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Node}

8
core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala

@ -18,10 +18,9 @@ package kafka.api @@ -18,10 +18,9 @@ package kafka.api
import java.util
import kafka.admin.AdminClient
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internal.ScramMechanism
@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest @@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
val adminClient = AdminClient.create(config.asScala.toMap)
val (error, token) = adminClient.createToken(List())
val adminClient = AdminClient.create(config)
val token = adminClient.createDelegationToken().delegationToken().get()
//wait for token to reach all the brokers
TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty),
"Timed out waiting for token to propagate to all servers")

147
core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala

@ -0,0 +1,147 @@ @@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util
import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionException
class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: org.apache.kafka.clients.admin.AdminClient = null
override def numBrokers = 1
@Before
override def setUp(): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
super.setUp()
}
override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = true,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
props.foreach(propertyOverrides)
props.map(KafkaConfig.fromProps)
}
private def createAdminConfig():util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
config
}
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig)
val renewer1 = "User:renewer1"
val renewer2 = "User:renewer2"
// create token1 with renewer1
val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1)))
var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
assertTrue(tokens.size == 1)
val token1 = tokens.head
assertEquals(token1, tokenCreated)
// create token2 with renewer2
val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2)))
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
assertTrue(tokens.size == 2)
assertEquals(Set(token1, token2), tokens.toSet)
//get tokens for renewer2
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2)))
assertTrue(tokens.size == 1)
assertEquals(Set(token2), tokens.toSet)
//test renewing tokens
val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()))
val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head
assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp())
//test expire tokens
DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()))
DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()))
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
assertTrue(tokens.size == 0)
//create token with invalid renewer principal type
intercept[ExecutionException](DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3"))))
// try describing tokens for unknown owner
assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty)
}
private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = {
val opts = ListBuffer("--bootstrap-server", brokerList, "--max-life-time-period", "-1",
"--command-config", "testfile", "--create")
renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer))
new DelegationTokenCommandOptions(opts.toArray)
}
private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = {
val opts = ListBuffer("--bootstrap-server", brokerList, "--command-config", "testfile", "--describe")
owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
new DelegationTokenCommandOptions(opts.toArray)
}
private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--renew",
"--renew-time-period", "-1",
"--hmac", hmac)
new DelegationTokenCommandOptions(opts)
}
private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--expire",
"--expiry-time-period", "-1",
"--hmac", hmac)
new DelegationTokenCommandOptions(opts)
}
@After
override def tearDown(): Unit = {
if (adminClient != null)
adminClient.close()
super.tearDown()
closeSasl()
}
}

3
core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala

@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness @@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internal.ScramMechanism
import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
import org.junit.Assert._
import org.junit.{After, Before, Test}

27
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala

@ -19,14 +19,13 @@ package kafka.server @@ -19,14 +19,13 @@ package kafka.server
import java.nio.ByteBuffer
import java.util
import kafka.admin.AdminClient
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.protocol.Errors
import org.junit.Assert._
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionException
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
var adminClient: AdminClient = null
@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { @@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
adminClient = AdminClient.create(createAdminConfig)
val createResponse = adminClient.createToken(List())
assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1)
val createResult = adminClient.createDelegationToken()
intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
val describeResponse = adminClient.describeToken(List())
assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1)
val describeResult = adminClient.describeDelegationToken()
intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
//test renewing tokens
val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
val renewResult = adminClient.renewDelegationToken("".getBytes())
intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
//test expire tokens tokens
val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1)
val expireResult = adminClient.expireDelegationToken("".getBytes())
intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
}

102
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala

@ -18,17 +18,17 @@ package kafka.server @@ -18,17 +18,17 @@ package kafka.server
import java.util
import kafka.admin.AdminClient
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
import org.apache.kafka.common.errors.InvalidPrincipalTypeException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.SecurityUtils
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionException
class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { @@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
super.setUp()
}
def createAdminConfig():util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
config
}
override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = true,
@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { @@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
props.map(KafkaConfig.fromProps)
}
private def createAdminConfig():util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
config
}
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
// test creating token
val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
val tokenResult1 = adminClient.createToken(renewer1)
assertEquals(Errors.NONE, tokenResult1._1)
var token1 = adminClient.describeToken(null)._2.head
assertEquals(token1, tokenResult1._2)
adminClient = AdminClient.create(createAdminConfig)
// create token1 with renewer1
val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:renewer1")).asJava
val createResult1 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer1))
val tokenCreated = createResult1.delegationToken().get()
//test describe token
var tokens = adminClient.describeDelegationToken().delegationTokens().get()
assertTrue(tokens.size() == 1)
var token1 = tokens.get(0)
assertEquals(token1, tokenCreated)
// create token2 with renewer2
val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:renewer2")).asJava
val createResult2 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer2))
val token2 = createResult2.delegationToken().get()
//get all tokens
tokens = adminClient.describeDelegationToken().delegationTokens().get()
assertTrue(tokens.size() == 2)
assertEquals(Set(token1, token2), tokens.asScala.toSet)
//get tokens for renewer2
tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(renewer2)).delegationTokens().get()
assertTrue(tokens.size() == 1)
assertEquals(Set(token2), tokens.asScala.toSet)
//test renewing tokens
val renewResponse = adminClient.renewToken(token1.hmacBuffer())
assertEquals(Errors.NONE, renewResponse._1)
token1 = adminClient.describeToken(null)._2.head
assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
val renewResult = adminClient.renewDelegationToken(token1.hmac())
var expiryTimestamp = renewResult.expiryTimestamp().get()
//test describe tokens
val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
val tokenResult2 = adminClient.createToken(renewer2)
assertEquals(Errors.NONE, tokenResult2._1)
val token2 = tokenResult2._2
val describeResult = adminClient.describeDelegationToken()
val tokenId = token1.tokenInfo().tokenId()
token1 = describeResult.delegationTokens().get().asScala.filter(dt => dt.tokenInfo().tokenId() == tokenId).head
assertEquals(expiryTimestamp, token1.tokenInfo().expiryTimestamp())
assertTrue(adminClient.describeToken(null)._2.size == 2)
//test expire tokens
val expireResult1 = adminClient.expireDelegationToken(token1.hmac())
expiryTimestamp = expireResult1.expiryTimestamp().get()
//test expire tokens tokens
val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
assertEquals(Errors.NONE, expireResponse1._1)
val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
expiryTimestamp = expireResult2.expiryTimestamp().get()
val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
assertEquals(Errors.NONE, expireResponse2._1)
assertTrue(adminClient.describeToken(null)._2.size == 0)
tokens = adminClient.describeDelegationToken().delegationTokens().get()
assertTrue(tokens.size == 0)
//create token with invalid principal type
val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
val tokenResult3 = adminClient.createToken(renewer3)
assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
val createResult3 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer3))
intercept[ExecutionException](createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
// try describing tokens for unknown owner
val unknownOwner = List(SecurityUtils.parseKafkaPrincipal("User:Unknown")).asJava
tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(unknownOwner)).delegationTokens().get()
assertTrue(tokens.isEmpty)
}
@After

32
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala

@ -19,17 +19,15 @@ package kafka.server @@ -19,17 +19,15 @@ package kafka.server
import java.nio.ByteBuffer
import java.util
import kafka.admin.AdminClient
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.SecurityUtils
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.errors.DelegationTokenDisabledException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionException
class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest @@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
val createResponse = adminClient.createToken(renewer1)
assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
adminClient = AdminClient.create(createAdminConfig)
val describeResponse = adminClient.describeToken(List())
assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
val createResult = adminClient.createDelegationToken()
intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
//test renewing tokens
val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
val describeResult = adminClient.describeDelegationToken()
intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
//test expire tokens tokens
val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
val renewResult = adminClient.renewDelegationToken("".getBytes())
intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
val expireResult = adminClient.expireDelegationToken("".getBytes())
intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
}
@After

4
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest { @@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest {
new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
case ApiKeys.RENEW_DELEGATION_TOKEN =>
new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
case ApiKeys.DELETE_GROUPS =>
new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))

Loading…
Cancel
Save