diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 918bb55dd4c..32f19e268f4 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -14,19 +14,17 @@ package kafka.api -import java.util.{Collections, Properties} +import java.util.{Collections, HashMap, Properties} -import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId} +import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaId, QuotaType} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.{MetricName, TopicPartition} -import org.apache.kafka.common.metrics.Quota +import org.apache.kafka.common.metrics.{KafkaMetric, Quota} import org.junit.Assert._ import org.junit.{Before, Test} -import kafka.server.QuotaType -import org.apache.kafka.common.metrics.KafkaMetric abstract class BaseQuotaTest extends IntegrationTestHarness { @@ -83,12 +81,16 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { def testThrottledProducerConsumer() { val numRecords = 1000 - val produced = produceUntilThrottled(producers.head, numRecords) + val producer = producers.head + val produced = produceUntilThrottled(producer, numRecords) assertTrue("Should have been throttled", producerThrottleMetric.value > 0) + verifyProducerThrottleTimeMetric(producer) // Consumer should read in a bursty manner and get throttled immediately - consumeUntilThrottled(consumers.head, produced) + val consumer = consumers.head + consumeUntilThrottled(consumer, produced) assertTrue("Should have been throttled", consumerThrottleMetric.value > 0) + verifyConsumerThrottleTimeMetric(consumer) } @Test @@ -152,6 +154,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { } assertTrue("Should have been throttled", throttled) + verifyConsumerThrottleTimeMetric(consumer, Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0)) assertNotNull("Exempt requests not recorded", exemptRequestMetric) assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0) @@ -205,6 +208,27 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { } } + private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) { + val tags = new HashMap[String, String] + tags.put("client-id", producerClientId) + val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags)) + val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags)) + + TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0, + s"Producer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}") + } + + private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) { + val tags = new HashMap[String, String] + tags.put("client-id", consumerClientId) + val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags)) + val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags)) + + TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0, + s"Consumer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}") + maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${maxMetric.value}", maxMetric.value <= max)) + } + private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = { leaderNode.metrics.metricName("throttle-time", quotaType.toString,