|
|
|
@ -81,6 +81,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
@@ -81,6 +81,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|
|
|
|
@After |
|
|
|
|
override def tearDown(): Unit = { |
|
|
|
|
adminClients.foreach(_.close()) |
|
|
|
|
GroupedUserQuotaCallback.tearDown() |
|
|
|
|
super.tearDown() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -102,7 +103,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
@@ -102,7 +103,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|
|
|
|
// ClientQuotaCallback#quotaLimit is invoked by each quota manager once for each new client |
|
|
|
|
assertEquals(1, quotaLimitCalls(ClientQuotaType.PRODUCE).get) |
|
|
|
|
assertEquals(1, quotaLimitCalls(ClientQuotaType.FETCH).get) |
|
|
|
|
assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", quotaLimitCalls(ClientQuotaType.REQUEST).get <= serverCount) |
|
|
|
|
assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", quotaLimitCalls(ClientQuotaType.REQUEST).get <= 10) // sanity check |
|
|
|
|
// Large quota updated to small quota, should throttle |
|
|
|
|
user.configureAndWaitForQuota(9000, 3000) |
|
|
|
|
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true) |
|
|
|
@ -321,6 +322,12 @@ object GroupedUserQuotaCallback {
@@ -321,6 +322,12 @@ object GroupedUserQuotaCallback {
|
|
|
|
|
ClientQuotaType.REQUEST -> new AtomicInteger |
|
|
|
|
) |
|
|
|
|
val callbackInstances = new AtomicInteger |
|
|
|
|
|
|
|
|
|
def tearDown(): Unit = { |
|
|
|
|
callbackInstances.set(0) |
|
|
|
|
quotaLimitCalls.values.foreach(_.set(0)) |
|
|
|
|
UnlimitedQuotaMetricTags.clear() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|