From 700a931ff55293bce1825a96913f059ff1dec2be Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Fri, 10 Jan 2020 11:23:24 +0000 Subject: [PATCH] KAFKA-9188; Fix flaky test SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads (#7918) The test blocks requests threads while sending the ACL update requests and occasionally hits request timeout. Updated the test to tolerate timeouts and retry the request for that case. Added an additional check to verify that the requests threads are unblocked when the semaphore is released, ensuring that the timeout is not due to blocked threads. Reviewers: Manikumar Reddy --- .../kafka/api/SslAdminIntegrationTest.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 1b11b91b660..e25458a13d7 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -196,8 +196,24 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { // Release the semaphore and verify that all requests complete testSemaphore.release(aclFutures.size) + waitForNoBlockedRequestThreads() assertNotNull(describeFuture.get(10, TimeUnit.SECONDS)) - aclFutures.foreach(_.all().get()) + // If any of the requests time out since we were blocking the threads earlier, retry the request. + val numTimedOut = aclFutures.count { future => + try { + future.all().get() + false + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException]) + true + else + throw e.getCause + } + } + (0 until numTimedOut) + .map(_ => createAdminClient.createAcls(List(acl2).asJava)) + .foreach(_.all().get(30, TimeUnit.SECONDS)) } /**