diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index 374556bda22..78d9af7bdfc 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -76,6 +76,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { val (tp, partitionResponse) = produceResponse.responses.asScala.head assertEquals(Errors.NONE, partitionResponse.error) + TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIP - 1), "produce request connection is not closed") conns = conns :+ connect(socketServer) // now try one more (should fail) intercept[IOException](sendProduceRequest()) @@ -100,12 +101,14 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { val (tp1, partitionResponse1) = produceResponse.responses.asScala.head assertEquals(Errors.NONE, partitionResponse1.error) + TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "produce request connection is not closed") conns = conns :+ connect(socketServer) // now try one more (should fail) intercept[IOException](sendProduceRequest()) //close one connection conns.head.close() + TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "connection is not closed") // send should succeed sendProduceRequest() }