|
|
|
@ -24,6 +24,7 @@ import kafka.zk.ConfigEntityChangeNotificationZNode
@@ -24,6 +24,7 @@ import kafka.zk.ConfigEntityChangeNotificationZNode
|
|
|
|
|
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} |
|
|
|
|
import org.apache.kafka.common.config.SaslConfigs |
|
|
|
|
import org.apache.kafka.common.security.auth.SecurityProtocol |
|
|
|
|
import org.apache.kafka.common.security.scram.ScramCredential |
|
|
|
|
import org.apache.kafka.common.security.scram.internals.ScramMechanism |
|
|
|
|
import org.apache.kafka.common.security.token.delegation.DelegationToken |
|
|
|
|
import org.junit.Before |
|
|
|
@ -58,6 +59,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
@@ -58,6 +59,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
|
|
|
|
|
|
|
|
|
|
// create scram credential for user "scram-user" |
|
|
|
|
createScramCredentials(zkConnect, clientPrincipal, clientPassword) |
|
|
|
|
waitForScramCredentials(clientPrincipal) |
|
|
|
|
|
|
|
|
|
//create a token with "scram-user" credentials |
|
|
|
|
val token = createDelegationToken() |
|
|
|
@ -68,6 +70,13 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
@@ -68,6 +70,13 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
|
|
|
|
|
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private def waitForScramCredentials(clientPrincipal: String): Unit = { |
|
|
|
|
servers.foreach { server => |
|
|
|
|
val cache = server.credentialProvider.credentialCache.cache(kafkaClientSaslMechanism, classOf[ScramCredential]) |
|
|
|
|
TestUtils.waitUntilTrue(() => cache.get(clientPrincipal) != null, s"SCRAM credentials not created for $clientPrincipal") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
|
override def setUp() { |
|
|
|
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both)) |
|
|
|
|