diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala index 29c7507449b..b72cf3c08ab 100644 --- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala @@ -24,6 +24,7 @@ import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.DelegationToken import org.junit.Before @@ -58,6 +59,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest // create scram credential for user "scram-user" createScramCredentials(zkConnect, clientPrincipal, clientPassword) + waitForScramCredentials(clientPrincipal) //create a token with "scram-user" credentials val token = createDelegationToken() @@ -68,6 +70,13 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) } + private def waitForScramCredentials(clientPrincipal: String): Unit = { + servers.foreach { server => + val cache = server.credentialProvider.credentialCache.cache(kafkaClientSaslMechanism, classOf[ScramCredential]) + TestUtils.waitUntilTrue(() => cache.get(clientPrincipal) != null, s"SCRAM credentials not created for $clientPrincipal") + } + } + @Before override def setUp() { startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))