|
|
|
@ -76,6 +76,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
@@ -76,6 +76,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
|
|
|
|
|
val (tp, partitionResponse) = produceResponse.responses.asScala.head |
|
|
|
|
assertEquals(Errors.NONE, partitionResponse.error) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIP - 1), "produce request connection is not closed") |
|
|
|
|
conns = conns :+ connect(socketServer) |
|
|
|
|
// now try one more (should fail) |
|
|
|
|
intercept[IOException](sendProduceRequest()) |
|
|
|
@ -100,12 +101,14 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
@@ -100,12 +101,14 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
|
|
|
|
|
val (tp1, partitionResponse1) = produceResponse.responses.asScala.head |
|
|
|
|
assertEquals(Errors.NONE, partitionResponse1.error) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "produce request connection is not closed") |
|
|
|
|
conns = conns :+ connect(socketServer) |
|
|
|
|
// now try one more (should fail) |
|
|
|
|
intercept[IOException](sendProduceRequest()) |
|
|
|
|
|
|
|
|
|
//close one connection |
|
|
|
|
conns.head.close() |
|
|
|
|
TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "connection is not closed") |
|
|
|
|
// send should succeed |
|
|
|
|
sendProduceRequest() |
|
|
|
|
} |
|
|
|
|