Browse Source

KAFKA-5402; Avoid creating quota related metrics if quotas not enabled

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3303 from rajinisivaram/KAFKA-5402
pull/3446/merge
Rajini Sivaram 7 years ago
parent
commit
dc95456f1d
  1. 26
      core/src/main/scala/kafka/server/ClientQuotaManager.scala
  2. 26
      core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
  3. 27
      core/src/main/scala/kafka/server/KafkaApis.scala
  4. 4
      core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
  5. 2
      core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
  6. 36
      core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
  7. 40
      core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala

26
core/src/main/scala/kafka/server/ClientQuotaManager.scala

@ -138,7 +138,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -138,7 +138,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val time: Time) extends Logging {
private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
@volatile private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
private val lock = new ReentrantReadWriteLock()
private val delayQueue = new DelayQueue[ThrottledResponse]()
private val sensorAccessor = new SensorAccess
@ -171,6 +171,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -171,6 +171,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
/**
* Returns true if any quotas are enabled for this quota manager. This is used
* to determine if quota related metrics should be created.
* Note: If any quotas (static defaults, dynamic defaults or quota overrides) have
* been configured for this broker at any time for this quota type, quotasEnabled will
* return true until the next broker restart, even if all quotas are subsequently deleted.
*/
def quotasEnabled: Boolean = quotaTypesEnabled != QuotaTypes.NoQuotas
/**
* Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
* If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
@ -183,9 +192,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -183,9 +192,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* @return Number of milliseconds to delay the response in case of Quota violation.
* Zero otherwise
*/
def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
if (quotasEnabled) {
val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
} else {
// Don't record any metrics if quotas are not enabled at any level
val throttleTimeMs = 0
callback(throttleTimeMs)
throttleTimeMs
}
}
def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback: Int => Unit): Int = {
@ -405,7 +421,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -405,7 +421,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
.quota(quota)
}
protected def createSensor(sensorName: String, metricName: MetricName): Sensor = {
protected def getOrCreateSensor(sensorName: String, metricName: MetricName): Sensor = {
sensorAccessor.getOrCreate(
sensorName,
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,

26
core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala

@ -27,12 +27,34 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, @@ -27,12 +27,34 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
val exemptSensor = createSensor(exemptSensorName, exemptMetricName)
def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
def recordExempt(value: Double) {
exemptSensor.record(value)
}
def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, requestThreadTimeNanos: Long,
sendResponseCallback: Int => Unit, recordNetworkThreadTimeCallback: (Long => Unit) => Unit): Unit = {
if (quotasEnabled) {
val quotaSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
recordNetworkThreadTimeCallback(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
recordAndThrottleOnQuotaViolation(
quotaSensors,
nanosToPercentage(requestThreadTimeNanos),
sendResponseCallback)
} else {
sendResponseCallback(0)
}
}
def maybeRecordExempt(requestThreadTimeNanos: Long, recordNetworkThreadTimeCallback: (Long => Unit) => Unit): Unit = {
if (quotasEnabled) {
recordNetworkThreadTimeCallback(timeNanos => recordExempt(nanosToPercentage(timeNanos)))
recordExempt(nanosToPercentage(requestThreadTimeNanos))
}
}
override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
}
@ -51,4 +73,6 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, @@ -51,4 +73,6 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
private def exemptSensorName: String = "exempt-" + QuotaType.Request
private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
}

27
core/src/main/scala/kafka/server/KafkaApis.scala

@ -446,7 +446,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -446,7 +446,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeNanos = time.nanoseconds
quotas.produce.recordAndMaybeThrottle(
quotas.produce.maybeRecordAndThrottle(
request.session.sanitizedUser,
request.header.clientId,
numBytesAppended,
@ -594,7 +594,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -594,7 +594,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// result in data being loaded into memory, it is better to do this after throttling to avoid OOM.
val response = new FetchResponse(fetchedPartitionData, 0)
val responseStruct = response.toStruct(versionId)
quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
quotas.fetch.maybeRecordAndThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
fetchResponseCallback)
}
}
@ -2003,16 +2003,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -2003,16 +2003,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeNanos = time.nanoseconds
}
val quotaSensors = quotas.request.getOrCreateQuotaSensors(request.session.sanitizedUser, clientId)
def recordNetworkThreadTimeNanos(timeNanos: Long) {
quotas.request.recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))
}
request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
quotas.request.recordAndThrottleOnQuotaViolation(
quotaSensors,
nanosToPercentage(request.requestThreadTimeNanos),
sendResponseCallback)
quotas.request.maybeRecordAndThrottle(request.session.sanitizedUser, clientId,
request.requestThreadTimeNanos, sendResponseCallback,
callback => request.recordNetworkThreadTimeCallback = Some(callback))
}
private def sendResponseExemptThrottle(response: RequestChannel.Response) {
@ -2020,18 +2013,12 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -2020,18 +2013,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
def recordNetworkThreadTimeNanos(timeNanos: Long) {
quotas.request.recordExempt(nanosToPercentage(timeNanos))
}
request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
quotas.request.recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
quotas.request.maybeRecordExempt(request.requestThreadTimeNanos,
callback => request.recordNetworkThreadTimeCallback = Some(callback))
sendResponseCallback()
}
private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
requestChannel.sendResponse(RequestChannel.Response(request, response))
}
private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
}

4
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala

@ -45,6 +45,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { @@ -45,6 +45,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
val tp = new TopicPartition(topic, part)
val part2 = 1
val tp2 = new TopicPartition(topic, part2)
val producerClientId = "ConsumerTestProducer"
val consumerClientId = "ConsumerTestConsumer"
// configure the servers and clients
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
@ -54,6 +56,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { @@ -54,6 +56,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

2
core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala

@ -147,7 +147,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { @@ -147,7 +147,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
consumer.subscribe(Collections.singleton(topic1))
val endTimeMs = System.currentTimeMillis + 10000
var throttled = false
while (!throttled && System.currentTimeMillis < endTimeMs) {
while ((!throttled || exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
consumer.poll(100)
val throttleMetric = consumerRequestThrottleMetric
throttled = throttleMetric != null && throttleMetric.value > 0

36
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala

@ -33,6 +33,8 @@ import org.junit.Test @@ -33,6 +33,8 @@ import org.junit.Test
import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
import kafka.server.QuotaType
import kafka.server.KafkaServer
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
class PlaintextConsumerTest extends BaseConsumerTest {
@ -1472,6 +1474,40 @@ class PlaintextConsumerTest extends BaseConsumerTest { @@ -1472,6 +1474,40 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
@Test
def testQuotaMetricsNotCreatedIfNoQuotasConfigured() {
val numRecords = 1000
sendRecords(numRecords)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.seek(tp, 0)
consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0)
def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String) {
val metricName = broker.metrics.metricName("throttle-time",
quotaType.toString,
"",
"user", "",
"client-id", clientId)
assertNull("Metric should not hanve been created " + metricName, broker.metrics.metric(metricName))
}
servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId))
servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId))
servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId))
servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId))
servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId))
def assertNoExemptRequestMetric(broker: KafkaServer) {
val metricName = broker.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
assertNull("Metric should not hanve been created " + metricName, broker.metrics.metric(metricName))
}
servers.foreach(assertNoExemptRequestMetric(_))
}
def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
// use consumers defined in this class plus one additional consumer
// Use topic defined in this class + one additional topic

40
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala

@ -52,7 +52,7 @@ class ClientQuotaManagerTest { @@ -52,7 +52,7 @@ class ClientQuotaManagerTest {
assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client2.user, client2.clientId))
// p1 should be throttled using the overridden quota
var throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 2500 * config.numQuotaSamples, this.callback)
var throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 2500 * config.numQuotaSamples, this.callback)
assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
// Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
@ -60,14 +60,14 @@ class ClientQuotaManagerTest { @@ -60,14 +60,14 @@ class ClientQuotaManagerTest {
clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(3000, true)))
assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId))
throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 0, this.callback)
throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
// Case 3: Change quota back to default. Should be throttled again
clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(500, true)))
assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(client1.user, client1.clientId))
throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 0, this.callback)
throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
// Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled
@ -75,7 +75,7 @@ class ClientQuotaManagerTest { @@ -75,7 +75,7 @@ class ClientQuotaManagerTest {
clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, Some(new Quota(4000, true)))
assertEquals("Should return the newly overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId))
throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 1000 * config.numQuotaSamples, this.callback)
throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 1000 * config.numQuotaSamples, this.callback)
assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
} finally {
@ -154,7 +154,7 @@ class ClientQuotaManagerTest { @@ -154,7 +154,7 @@ class ClientQuotaManagerTest {
def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, clientId))
val throttleTimeMs = quotaManager.recordAndMaybeThrottle(user, clientId, value * config.numQuotaSamples, this.callback)
val throttleTimeMs = quotaManager.maybeRecordAndThrottle(user, clientId, value * config.numQuotaSamples, this.callback)
if (expectThrottle)
assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
else
@ -230,7 +230,7 @@ class ClientQuotaManagerTest { @@ -230,7 +230,7 @@ class ClientQuotaManagerTest {
* if we produce under the quota
*/
for (_ <- 0 until 10) {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
time.sleep(1000)
}
assertEquals(10, numCallbacks)
@ -241,7 +241,7 @@ class ClientQuotaManagerTest { @@ -241,7 +241,7 @@ class ClientQuotaManagerTest {
// (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
// 10.5 seconds because the last window is half complete
time.sleep(500)
val sleepTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 2300, callback)
val sleepTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 2300, callback)
assertEquals("Should be throttled", 2100, sleepTime)
assertEquals(1, queueSizeMetric.value().toInt)
@ -257,12 +257,12 @@ class ClientQuotaManagerTest { @@ -257,12 +257,12 @@ class ClientQuotaManagerTest {
// Could continue to see delays until the bursty sample disappears
for (_ <- 0 until 10) {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
time.sleep(1000)
}
assertEquals("Should be unthrottled since bursty sample has rolled over",
0, clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 0, callback))
0, clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 0, callback))
} finally {
clientMetrics.shutdown()
}
@ -280,7 +280,7 @@ class ClientQuotaManagerTest { @@ -280,7 +280,7 @@ class ClientQuotaManagerTest {
* if we are under the quota
*/
for (_ <- 0 until 10) {
quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
time.sleep(1000)
}
assertEquals(10, numCallbacks)
@ -292,7 +292,7 @@ class ClientQuotaManagerTest { @@ -292,7 +292,7 @@ class ClientQuotaManagerTest {
// (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
// 10.5 seconds interval because the last window is half complete
time.sleep(500)
val throttleTime = quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(67.1), callback)
val throttleTime = quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(67.1), callback)
assertEquals("Should be throttled", 210, throttleTime)
assertEquals(1, queueSizeMetric.value().toInt)
@ -308,22 +308,22 @@ class ClientQuotaManagerTest { @@ -308,22 +308,22 @@ class ClientQuotaManagerTest {
// Could continue to see delays until the bursty sample disappears
for (_ <- 0 until 11) {
quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
time.sleep(1000)
}
assertEquals("Should be unthrottled since bursty sample has rolled over",
0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
// Create a very large spike which requires > one quota window to bring within quota
assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(500), callback))
assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(500), callback))
for (_ <- 0 until 10) {
time.sleep(1000)
assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
}
time.sleep(1000)
assertEquals("Should be unthrottled since bursty sample has rolled over",
0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0, callback))
0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
} finally {
quotaManager.shutdown()
@ -335,11 +335,11 @@ class ClientQuotaManagerTest { @@ -335,11 +335,11 @@ class ClientQuotaManagerTest {
val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
try {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 100, callback)
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
// remove the throttle time sensor
metrics.removeSensor("ProduceThrottleTime-:client1")
// should not throw an exception even if the throttle time sensor does not exist.
val throttleTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 10000, callback)
val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000, callback)
assertTrue("Should be throttled", throttleTime > 0)
// the sensor should get recreated
val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
@ -354,12 +354,12 @@ class ClientQuotaManagerTest { @@ -354,12 +354,12 @@ class ClientQuotaManagerTest {
val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
try {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 100, callback)
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
// remove all the sensors
metrics.removeSensor("ProduceThrottleTime-:client1")
metrics.removeSensor("Produce-ANONYMOUS:client1")
// should not throw an exception
val throttleTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 10000, callback)
val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000, callback)
assertTrue("Should be throttled", throttleTime > 0)
// all the sensors should get recreated

Loading…
Cancel
Save