diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 1b11b91b660..e25458a13d7 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -196,8 +196,24 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { // Release the semaphore and verify that all requests complete testSemaphore.release(aclFutures.size) + waitForNoBlockedRequestThreads() assertNotNull(describeFuture.get(10, TimeUnit.SECONDS)) - aclFutures.foreach(_.all().get()) + // If any of the requests time out since we were blocking the threads earlier, retry the request. + val numTimedOut = aclFutures.count { future => + try { + future.all().get() + false + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException]) + true + else + throw e.getCause + } + } + (0 until numTimedOut) + .map(_ => createAdminClient.createAcls(List(acl2).asJava)) + .foreach(_.all().get(30, TimeUnit.SECONDS)) } /**