|
|
|
@ -14,19 +14,17 @@
@@ -14,19 +14,17 @@
|
|
|
|
|
|
|
|
|
|
package kafka.api |
|
|
|
|
|
|
|
|
|
import java.util.{Collections, Properties} |
|
|
|
|
import java.util.{Collections, HashMap, Properties} |
|
|
|
|
|
|
|
|
|
import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId} |
|
|
|
|
import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaId, QuotaType} |
|
|
|
|
import kafka.utils.TestUtils |
|
|
|
|
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} |
|
|
|
|
import org.apache.kafka.clients.producer._ |
|
|
|
|
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback |
|
|
|
|
import org.apache.kafka.common.{MetricName, TopicPartition} |
|
|
|
|
import org.apache.kafka.common.metrics.Quota |
|
|
|
|
import org.apache.kafka.common.metrics.{KafkaMetric, Quota} |
|
|
|
|
import org.junit.Assert._ |
|
|
|
|
import org.junit.{Before, Test} |
|
|
|
|
import kafka.server.QuotaType |
|
|
|
|
import org.apache.kafka.common.metrics.KafkaMetric |
|
|
|
|
|
|
|
|
|
abstract class BaseQuotaTest extends IntegrationTestHarness { |
|
|
|
|
|
|
|
|
@ -83,12 +81,16 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
@@ -83,12 +81,16 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
|
|
|
|
|
def testThrottledProducerConsumer() { |
|
|
|
|
|
|
|
|
|
val numRecords = 1000 |
|
|
|
|
val produced = produceUntilThrottled(producers.head, numRecords) |
|
|
|
|
val producer = producers.head |
|
|
|
|
val produced = produceUntilThrottled(producer, numRecords) |
|
|
|
|
assertTrue("Should have been throttled", producerThrottleMetric.value > 0) |
|
|
|
|
verifyProducerThrottleTimeMetric(producer) |
|
|
|
|
|
|
|
|
|
// Consumer should read in a bursty manner and get throttled immediately |
|
|
|
|
consumeUntilThrottled(consumers.head, produced) |
|
|
|
|
val consumer = consumers.head |
|
|
|
|
consumeUntilThrottled(consumer, produced) |
|
|
|
|
assertTrue("Should have been throttled", consumerThrottleMetric.value > 0) |
|
|
|
|
verifyConsumerThrottleTimeMetric(consumer) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -152,6 +154,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
@@ -152,6 +154,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
assertTrue("Should have been throttled", throttled) |
|
|
|
|
verifyConsumerThrottleTimeMetric(consumer, Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0)) |
|
|
|
|
|
|
|
|
|
assertNotNull("Exempt requests not recorded", exemptRequestMetric) |
|
|
|
|
assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0) |
|
|
|
@ -205,6 +208,27 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
@@ -205,6 +208,27 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) { |
|
|
|
|
val tags = new HashMap[String, String] |
|
|
|
|
tags.put("client-id", producerClientId) |
|
|
|
|
val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags)) |
|
|
|
|
val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags)) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0, |
|
|
|
|
s"Producer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) { |
|
|
|
|
val tags = new HashMap[String, String] |
|
|
|
|
tags.put("client-id", consumerClientId) |
|
|
|
|
val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags)) |
|
|
|
|
val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags)) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0, |
|
|
|
|
s"Consumer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}") |
|
|
|
|
maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${maxMetric.value}", maxMetric.value <= max)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = { |
|
|
|
|
leaderNode.metrics.metricName("throttle-time", |
|
|
|
|
quotaType.toString, |
|
|
|
|