Browse Source

KAFKA-14590: Move DelegationTokenCommand to tools (#13172)

KAFKA-14590: Move DelegationTokenCommand to tools

Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <christo_lolov@yahoo.com>, Federico Valeri <fvaleri@redhat.com>
pull/13324/head
Gantigmaa Selenge 2 years ago committed by GitHub
parent
commit
ea30ec4b56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 91
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  2. 224
      core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
  3. 146
      core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
  4. 308
      tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
  5. 110
      tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java

91
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -50,8 +50,11 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -50,8 +50,11 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -61,6 +64,9 @@ import java.util.Map; @@ -61,6 +64,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
public class MockAdminClient extends AdminClient {
public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@ -94,6 +100,8 @@ public class MockAdminClient extends AdminClient { @@ -94,6 +100,8 @@ public class MockAdminClient extends AdminClient {
private Map<MetricName, Metric> mockMetrics = new HashMap<>();
private final List<DelegationToken> allTokens = new ArrayList<>();
public static Builder create() {
return new Builder();
}
@ -182,7 +190,7 @@ public class MockAdminClient extends AdminClient { @@ -182,7 +190,7 @@ public class MockAdminClient extends AdminClient {
return new MockAdminClient(brokers,
controller == null ? brokers.get(0) : controller,
clusterId,
defaultPartitions != null ? defaultPartitions.shortValue() : 1,
defaultPartitions != null ? defaultPartitions : 1,
defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
brokerLogDirs,
usingRaftController,
@ -596,22 +604,89 @@ public class MockAdminClient extends AdminClient { @@ -596,22 +604,89 @@ public class MockAdminClient extends AdminClient {
@Override
synchronized public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
KafkaFutureImpl<DelegationToken> future = new KafkaFutureImpl<>();
for (KafkaPrincipal renewer : options.renewers()) {
if (!renewer.getPrincipalType().equals(KafkaPrincipal.USER_TYPE)) {
future.completeExceptionally(new InvalidPrincipalTypeException(""));
return new CreateDelegationTokenResult(future);
}
}
String tokenId = Uuid.randomUuid().toString();
TokenInformation tokenInfo = new TokenInformation(tokenId, options.renewers().get(0), options.renewers(), System.currentTimeMillis(), options.maxlifeTimeMs(), -1);
DelegationToken token = new DelegationToken(tokenInfo, tokenId.getBytes());
allTokens.add(token);
future.complete(token);
return new CreateDelegationTokenResult(future);
}
@Override
synchronized public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
KafkaFutureImpl<Long> future = new KafkaFutureImpl<>();
Boolean tokenFound = false;
Long expiryTimestamp = options.renewTimePeriodMs();
for (DelegationToken token : allTokens) {
if (Arrays.equals(token.hmac(), hmac)) {
token.tokenInfo().setExpiryTimestamp(expiryTimestamp);
tokenFound = true;
}
}
if (tokenFound) {
future.complete(expiryTimestamp);
} else {
future.completeExceptionally(new DelegationTokenNotFoundException(""));
}
return new RenewDelegationTokenResult(future);
}
@Override
synchronized public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
KafkaFutureImpl<Long> future = new KafkaFutureImpl<>();
Long expiryTimestamp = options.expiryTimePeriodMs();
List<DelegationToken> tokensToRemove = new ArrayList<>();
Boolean tokenFound = false;
for (DelegationToken token : allTokens) {
if (Arrays.equals(token.hmac(), hmac)) {
if (expiryTimestamp == -1 || expiryTimestamp < System.currentTimeMillis()) {
tokensToRemove.add(token);
}
tokenFound = true;
}
}
if (tokenFound) {
allTokens.removeAll(tokensToRemove);
future.complete(expiryTimestamp);
} else {
future.completeExceptionally(new DelegationTokenNotFoundException(""));
}
return new ExpireDelegationTokenResult(future);
}
@Override
synchronized public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
KafkaFutureImpl<List<DelegationToken>> future = new KafkaFutureImpl<>();
if (options.owners().isEmpty()) {
future.complete(allTokens);
} else {
List<DelegationToken> tokensResult = new ArrayList<>();
for (DelegationToken token : allTokens) {
if (options.owners().contains(token.tokenInfo().owner())) {
tokensResult.add(token);
}
}
future.complete(tokensResult);
}
return new DescribeDelegationTokenResult(future);
}
@Override
@ -782,7 +857,7 @@ public class MockAdminClient extends AdminClient { @@ -782,7 +857,7 @@ public class MockAdminClient extends AdminClient {
case BROKER: {
int brokerId;
try {
brokerId = Integer.valueOf(resource.name());
brokerId = Integer.parseInt(resource.name());
} catch (NumberFormatException e) {
return e;
}
@ -921,7 +996,7 @@ public class MockAdminClient extends AdminClient { @@ -921,7 +996,7 @@ public class MockAdminClient extends AdminClient {
newReassignments.entrySet()) {
TopicPartition partition = entry.getKey();
Optional<NewPartitionReassignment> newReassignment = entry.getValue();
KafkaFutureImpl<Void> future = new KafkaFutureImpl<Void>();
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
futures.put(partition, future);
TopicMetadata topicMetadata = allTopics.get(partition.topic());
if (partition.partition() < 0 ||
@ -1058,7 +1133,7 @@ public class MockAdminClient extends AdminClient { @@ -1058,7 +1133,7 @@ public class MockAdminClient extends AdminClient {
) {
Map<String, KafkaFuture<Void>> results = new HashMap<>();
for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<Void>();
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
String feature = entry.getKey();
try {
short cur = featureLevels.getOrDefault(feature, (short) 0);

224
core/src/main/scala/kafka/admin/DelegationTokenCommand.scala

@ -1,224 +0,0 @@ @@ -1,224 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.text.SimpleDateFormat
import java.util
import java.util.Base64
import joptsimple.ArgumentAcceptingOptionSpec
import kafka.utils.{Exit, Logging}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
/**
* A command to manage delegation token.
*/
object DelegationTokenCommand extends Logging {
def main(args: Array[String]): Unit = {
val opts = new DelegationTokenCommandOptions(args)
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.")
// should have exactly one action
val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe")
opts.checkArgs()
val adminClient = createAdminClient(opts)
var exitCode = 0
try {
if(opts.options.has(opts.createOpt))
createToken(adminClient, opts)
else if(opts.options.has(opts.renewOpt))
renewToken(adminClient, opts)
else if(opts.options.has(opts.expiryOpt))
expireToken(adminClient, opts)
else if(opts.options.has(opts.describeOpt))
describeToken(adminClient, opts)
} catch {
case e: Throwable =>
println("Error while executing delegation token command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
adminClient.close()
Exit.exit(exitCode)
}
}
def createToken(adminClient: Admin, opts: DelegationTokenCommandOptions): DelegationToken = {
val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals)
val ownerPrincipal = getPrincipals(opts, opts.ownerPrincipalsOpt)
if (ownerPrincipal.isDefined)
createDelegationTokenOptions.owner(ownerPrincipal.get.asScala.head)
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
val token = createResult.delegationToken().get()
println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
token
}
def printToken(tokens: List[DelegationToken]): Unit = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
print("\n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
for (token <- tokens) {
val tokenInfo = token.tokenInfo
print("\n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s".format(
tokenInfo.tokenId,
token.hmacAsBase64String,
tokenInfo.owner,
tokenInfo.tokenRequester(),
tokenInfo.renewersAsString,
dateFormat.format(tokenInfo.issueTimestamp),
dateFormat.format(tokenInfo.expiryTimestamp),
dateFormat.format(tokenInfo.maxTimestamp)))
println()
}
}
private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = {
if (opts.options.has(principalOptionSpec))
Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava)
else
None
}
def renewToken(adminClient: Admin, opts: DelegationTokenCommandOptions): Long = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
val renewResult = adminClient.renewDelegationToken(Base64.getDecoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
val expiryTimeStamp = renewResult.expiryTimestamp().get()
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
expiryTimeStamp
}
def expireToken(adminClient: Admin, opts: DelegationTokenCommandOptions): Long = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
val expireResult = adminClient.expireDelegationToken(Base64.getDecoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
val expiryTimeStamp = expireResult.expiryTimestamp().get()
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
expiryTimeStamp
}
def describeToken(adminClient: Admin, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
if (ownerPrincipals.isEmpty)
println("Calling describe token operation for current user.")
else
println("Calling describe token operation for owners :" + ownerPrincipals.get)
val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull))
val tokens = describeResult.delegationTokens().get().asScala.toList
println("Total number of tokens : %s".format(tokens.size)); printToken(tokens)
tokens
}
private def createAdminClient(opts: DelegationTokenCommandOptions): Admin = {
val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
Admin.create(props)
}
class DelegationTokenCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val BootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping."
val CommandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
" operations are allowed in secure mode only. This config file is used to pass security related configs."
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
.withRequiredArg
.ofType(classOf[String])
val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.ofType(classOf[String])
val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.")
val renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.")
val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.")
val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
" If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.")
val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
.withOptionalArg()
.ofType(classOf[String])
val renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.")
.withOptionalArg()
.ofType(classOf[String])
val maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," +
" then token max life time will default to a server side config value (delegation.token.max.lifetime.ms).")
.withOptionalArg()
.ofType(classOf[Long])
val renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" +
" renew time period will default to a server side config value (delegation.token.expiry.time.ms).")
.withOptionalArg()
.ofType(classOf[Long])
val expiryTimePeriodOpt = parser.accepts("expiry-time-period", "Expiry time period in milliseconds. If the value is -1, then the" +
" token will get invalidated immediately." )
.withOptionalArg()
.ofType(classOf[Long])
val hmacOpt = parser.accepts("hmac", "HMAC of the delegation token")
.withOptionalArg
.ofType(classOf[String])
options = parser.parse(args : _*)
def checkArgs(): Unit = {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt)
if (options.has(createOpt))
CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt)
if (options.has(renewOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt)
}
if (options.has(expiryOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt)
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, createOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)
CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)
}
}
}

146
core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala

@ -1,146 +0,0 @@ @@ -1,146 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util
import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionException
class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: Admin = _
override def brokerCount = 1
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
super.setUp(testInfo)
}
override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps)
}
private def createAdminConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) }
config
}
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = Admin.create(createAdminConfig)
val renewer1 = "User:renewer1"
val renewer2 = "User:renewer2"
// create token1 with renewer1
val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1)))
var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
assertTrue(tokens.size == 1)
val token1 = tokens.head
assertEquals(token1, tokenCreated)
// create token2 with renewer2
val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2)))
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
assertTrue(tokens.size == 2)
assertEquals(Set(token1, token2), tokens.toSet)
//get tokens for renewer2
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2)))
assertTrue(tokens.size == 1)
assertEquals(Set(token2), tokens.toSet)
//test renewing tokens
val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()))
val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head
assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp())
//test expire tokens
DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()))
DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()))
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
assertTrue(tokens.size == 0)
//create token with invalid renewer principal type
assertThrows(classOf[ExecutionException], () => DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3"))))
// try describing tokens for unknown owner
assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty)
}
private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = {
val opts = ListBuffer("--bootstrap-server", bootstrapServers(), "--max-life-time-period", "-1",
"--command-config", "testfile", "--create")
renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer))
new DelegationTokenCommandOptions(opts.toArray)
}
private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = {
val opts = ListBuffer("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--describe")
owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
new DelegationTokenCommandOptions(opts.toArray)
}
private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
val opts = Array("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--renew",
"--renew-time-period", "-1",
"--hmac", hmac)
new DelegationTokenCommandOptions(opts)
}
private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
val opts = Array("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--expire",
"--expiry-time-period", "-1",
"--hmac", hmac)
new DelegationTokenCommandOptions(opts)
}
@AfterEach
override def tearDown(): Unit = {
if (adminClient != null)
adminClient.close()
super.tearDown()
closeSasl()
}
}

308
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java

@ -0,0 +1,308 @@ @@ -0,0 +1,308 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
public class DelegationTokenCommand {
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (TerseException e) {
System.err.println(e.getMessage());
return 1;
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
static void execute(String... args) throws Exception {
DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
// should have exactly one action
long numberOfActions = Stream.of(opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()).filter(b -> b).count();
if (numberOfActions != 1) {
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
}
opts.checkArgs();
try (Admin adminClient = createAdminClient(opts)) {
if (opts.hasCreateOpt()) {
createToken(adminClient, opts);
} else if (opts.hasRenewOpt()) {
renewToken(adminClient, opts);
} else if (opts.hasExpireOpt()) {
expireToken(adminClient, opts);
} else if (opts.hasDescribeOpt()) {
describeToken(adminClient, opts);
}
}
}
public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
Long maxLifeTimeMs = opts.maxLifeTime();
System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
if (!ownerPrincipals.isEmpty()) {
createDelegationTokenOptions.owner(ownerPrincipals.get(0));
}
CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
DelegationToken token = createResult.delegationToken().get();
System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
printToken(Collections.singletonList(token));
return token;
}
private static void printToken(List<DelegationToken> tokens) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
for (DelegationToken token : tokens) {
TokenInformation tokenInfo = token.tokenInfo();
System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
tokenInfo.tokenId(),
token.hmacAsBase64String(),
tokenInfo.owner(),
tokenInfo.tokenRequester(),
tokenInfo.renewersAsString(),
dateFormat.format(tokenInfo.issueTimestamp()),
dateFormat.format(tokenInfo.expiryTimestamp()),
dateFormat.format(tokenInfo.maxTimestamp()));
}
}
private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
List<KafkaPrincipal> principals = new ArrayList<>();
if (opts.options.has(principalOptionSpec)) {
for (String e : opts.options.valuesOf(principalOptionSpec))
principals.add(SecurityUtils.parseKafkaPrincipal(e.trim()));
}
return principals;
}
public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
String hmac = opts.hmac();
Long renewTimePeriodMs = opts.renewTimePeriod();
System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
Long expiryTimeStamp = renewResult.expiryTimestamp().get();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
return expiryTimeStamp;
}
public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
String hmac = opts.hmac();
Long expiryTimePeriodMs = opts.expiryTimePeriod();
System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
Long expiryTimeStamp = renewResult.expiryTimestamp().get();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
}
public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
if (ownerPrincipals.isEmpty()) {
System.out.println("Calling describe token operation for current user.");
} else {
System.out.printf("Calling describe token operation for owners: %s%n", ownerPrincipals);
}
DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
List<DelegationToken> tokens = describeResult.delegationTokens().get();
System.out.printf("Total number of tokens : %d", tokens.size());
printToken(tokens);
return tokens;
}
private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
return Admin.create(props);
}
static class DelegationTokenCommandOptions extends CommandDefaultOptions {
public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<Void> createOpt;
public final OptionSpec<Void> renewOpt;
public final OptionSpec<Void> expiryOpt;
public final OptionSpec<Void> describeOpt;
public final OptionSpec<String> ownerPrincipalsOpt;
public final OptionSpec<String> renewPrincipalsOpt;
public final OptionSpec<Long> maxLifeTimeOpt;
public final OptionSpec<Long> renewTimePeriodOpt;
public final OptionSpec<Long> expiryTimePeriodOpt;
public final OptionSpec<String> hmacOpt;
public DelegationTokenCommandOptions(String[] args) {
super(args);
String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
" operations are allowed in secure mode only. This config file is used to pass security related configs.";
this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
.withRequiredArg()
.ofType(String.class);
this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
.withRequiredArg()
.ofType(String.class);
this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewer principals.");
this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.");
this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.");
this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
" If --owner-principal option is not supplied, all the user-owned tokens and tokens where the user has Describe permissions will be returned.");
this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a Kafka principal. They should be in principalType:name format.")
.withOptionalArg()
.ofType(String.class);
this.renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a Kafka principal. They should be in principalType:name format.")
.withOptionalArg()
.ofType(String.class);
this.maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," +
" then token max life time will default to the server side config value of (delegation.token.max.lifetime.ms).")
.withOptionalArg()
.ofType(Long.class);
this.renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" +
" renew time period will default to the server side config value of (delegation.token.expiry.time.ms).")
.withOptionalArg()
.ofType(Long.class);
this.expiryTimePeriodOpt = parser.accepts("expiry-time-period", "Expiry time period in milliseconds. If the value is -1, then the" +
" token will get invalidated immediately.")
.withOptionalArg()
.ofType(Long.class);
this.hmacOpt = parser.accepts("hmac", "HMAC of the delegation token")
.withOptionalArg()
.ofType(String.class);
options = parser.parse(args);
}
public boolean hasCreateOpt() {
return options.has(createOpt);
}
public boolean hasRenewOpt() {
return options.has(renewOpt);
}
public boolean hasExpireOpt() {
return options.has(expiryOpt);
}
public boolean hasDescribeOpt() {
return options.has(describeOpt);
}
public long maxLifeTime() {
return options.valueOf(maxLifeTimeOpt);
}
public long renewTimePeriod() {
return options.valueOf(renewTimePeriodOpt);
}
public long expiryTimePeriod() {
return options.valueOf(expiryTimePeriodOpt);
}
public String hmac() {
return options.valueOf(hmacOpt);
}
public void checkArgs() {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt);
if (options.has(createOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt);
}
if (options.has(renewOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt);
}
if (options.has(expiryOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt);
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, createOpt, new HashSet<>(Arrays.asList(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, new HashSet<>(Arrays.asList(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, new HashSet<>(Arrays.asList(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, new HashSet<>(Arrays.asList(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)));
}
}
}

110
tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java

@ -0,0 +1,110 @@ @@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class DelegationTokenCommandTest {
@Test
public void testDelegationTokenRequests() throws ExecutionException, InterruptedException {
Admin adminClient = new MockAdminClient.Builder().build();
String renewer1 = "User:renewer1";
String renewer2 = "User:renewer2";
// create token1 with renewer1
DelegationToken tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer1));
List<DelegationToken> tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
assertEquals(1, tokens.size());
DelegationToken token1 = tokens.get(0);
assertEquals(token1, tokenCreated);
// create token2 with renewer2
DelegationToken token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer2));
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
assertEquals(2, tokens.size());
assertEquals(Arrays.asList(token1, token2), tokens);
//get tokens for renewer2
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer2));
assertEquals(1, tokens.size());
assertEquals(Collections.singletonList(token2), tokens);
//test renewing tokens
Long expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()));
DelegationToken renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer1)).get(0);
assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp());
//test expire tokens
DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()));
DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()));
tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
assertEquals(0, tokens.size());
//create token with invalid renewer principal type
assertThrows(ExecutionException.class, () -> DelegationTokenCommand.createToken(adminClient, getCreateOpts("Group:Renewer3")));
// try describing tokens for unknown owner
assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("User:Unknown")).isEmpty());
}
private DelegationTokenCommand.DelegationTokenCommandOptions getCreateOpts(String renewer) {
String[] args = {"--bootstrap-server", "localhost:9092", "--max-life-time-period", "-1", "--command-config", "testfile", "--create", "--renewer-principal", renewer};
return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
}
private DelegationTokenCommand.DelegationTokenCommandOptions getDescribeOpts(String owner) {
List<String> args = new ArrayList<>();
args.add("--bootstrap-server");
args.add("localhost:9092");
args.add("--command-config");
args.add("testfile");
args.add("--describe");
if (!owner.equals("")) {
args.add("--owner-principal");
args.add(owner);
}
return new DelegationTokenCommand.DelegationTokenCommandOptions(args.toArray(new String[0]));
}
private DelegationTokenCommand.DelegationTokenCommandOptions getRenewOpts(String hmac) {
String[] args = {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--renew", "--renew-time-period", "604800000", "--hmac", hmac};
return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
}
private DelegationTokenCommand.DelegationTokenCommandOptions getExpireOpts(String hmac) {
String[] args = {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--expire", "--expiry-time-period", "-1", "--hmac", hmac};
return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
}
}
Loading…
Cancel
Save