Browse Source

MINOR: Initialize BrokerTopicMetrics with no topic tag greedily (#7198)

This patch fixes the quota system test whose JMX tool relies on the existence
of these metrics.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Nikhil Bhatia <nikhil@confluent.io>, Tu V. Tran <tuvtran97@gmail.com>, Ismael Juma <ismael@juma.me.uk>
pull/7266/head
Stanislav Kozlovski 5 years ago committed by Ismael Juma
parent
commit
826408e122
  1. 147
      core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  2. 2
      core/src/main/scala/kafka/utils/Pool.scala
  3. 27
      core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

147
core/src/main/scala/kafka/server/KafkaRequestHandler.scala

@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.FatalExitError @@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
import scala.collection.mutable
import scala.collection.JavaConverters._
/**
* A thread that answers kafka requests.
@ -146,64 +147,92 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { @@ -146,64 +147,92 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
case Some(topic) => Map("topic" -> topic)
}
case class MeterWrapper(metricType: String, eventType: String) {
@volatile private var lazyMeter: Meter = _
private val meterLock = new Object
def meter(): Meter = {
var meter = lazyMeter
if (meter == null) {
meterLock synchronized {
meter = lazyMeter
if (meter == null) {
meter = newMeter(metricType, eventType, TimeUnit.SECONDS, tags)
lazyMeter = meter
}
}
}
meter
}
def close(): Unit = meterLock synchronized {
if (lazyMeter != null) {
removeMetric(metricType, tags)
lazyMeter = null
}
}
if (tags.isEmpty) // greedily initialize the general topic metrics
meter()
}
// an internal map for "lazy initialization" of certain metrics
private val metricTypeMap = new Pool[String, Meter]
def messagesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.MessagesInPerSec,
newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS, tags))
def bytesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesInPerSec,
newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags))
def bytesOutRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesOutPerSec,
newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))
def bytesRejectedRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesRejectedPerSec,
newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags))
private val metricTypeMap = new Pool[String, MeterWrapper]()
metricTypeMap.putAll(Map(
BrokerTopicStats.MessagesInPerSec -> MeterWrapper(BrokerTopicStats.MessagesInPerSec, "messages"),
BrokerTopicStats.BytesInPerSec -> MeterWrapper(BrokerTopicStats.BytesInPerSec, "bytes"),
BrokerTopicStats.BytesOutPerSec -> MeterWrapper(BrokerTopicStats.BytesOutPerSec, "bytes"),
BrokerTopicStats.BytesRejectedPerSec -> MeterWrapper(BrokerTopicStats.BytesRejectedPerSec, "bytes"),
BrokerTopicStats.FailedProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedProduceRequestsPerSec, "requests"),
BrokerTopicStats.FailedFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedFetchRequestsPerSec, "requests"),
BrokerTopicStats.TotalProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalProduceRequestsPerSec, "requests"),
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests")
).asJava)
if (name.isEmpty) {
metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
}
// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
def messagesInRate = metricTypeMap.get(BrokerTopicStats.MessagesInPerSec).meter()
def bytesInRate = metricTypeMap.get(BrokerTopicStats.BytesInPerSec).meter()
def bytesOutRate = metricTypeMap.get(BrokerTopicStats.BytesOutPerSec).meter()
def bytesRejectedRate = metricTypeMap.get(BrokerTopicStats.BytesRejectedPerSec).meter()
private[server] def replicationBytesInRate =
if (name.isEmpty) Some(metricTypeMap.getAndMaybePut(
BrokerTopicStats.ReplicationBytesInPerSec,
newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags)))
if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesInPerSec).meter())
else None
private[server] def replicationBytesOutRate =
if (name.isEmpty) Some(metricTypeMap.getAndMaybePut(
BrokerTopicStats.ReplicationBytesOutPerSec,
newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)))
if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesOutPerSec).meter())
else None
def failedProduceRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FailedProduceRequestsPerSec,
newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
def failedFetchRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FailedFetchRequestsPerSec,
newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
def totalProduceRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.TotalProduceRequestsPerSec,
newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
def totalFetchRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.TotalFetchRequestsPerSec,
newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
def fetchMessageConversionsRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FetchMessageConversionsPerSec,
newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags))
def produceMessageConversionsRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.ProduceMessageConversionsPerSec,
newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags))
// this method helps check with metricTypeMap first before deleting a metric
def removeMetricHelper(metricType: String, tags: scala.collection.Map[String, String]): Unit = {
val metric: Meter = metricTypeMap.remove(metricType)
if (metric != null) {
removeMetric(metricType, tags)
}
}
def close(): Unit = {
removeMetricHelper(BrokerTopicStats.MessagesInPerSec, tags)
removeMetricHelper(BrokerTopicStats.BytesInPerSec, tags)
removeMetricHelper(BrokerTopicStats.BytesOutPerSec, tags)
removeMetricHelper(BrokerTopicStats.BytesRejectedPerSec, tags)
if (replicationBytesInRate.isDefined)
removeMetricHelper(BrokerTopicStats.ReplicationBytesInPerSec, tags)
if (replicationBytesOutRate.isDefined)
removeMetricHelper(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
removeMetricHelper(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
removeMetricHelper(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
removeMetricHelper(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
removeMetricHelper(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
removeMetricHelper(BrokerTopicStats.FetchMessageConversionsPerSec, tags)
removeMetricHelper(BrokerTopicStats.ProduceMessageConversionsPerSec, tags)
def failedProduceRequestRate = metricTypeMap.get(BrokerTopicStats.FailedProduceRequestsPerSec).meter()
def failedFetchRequestRate = metricTypeMap.get(BrokerTopicStats.FailedFetchRequestsPerSec).meter()
def totalProduceRequestRate = metricTypeMap.get(BrokerTopicStats.TotalProduceRequestsPerSec).meter()
def totalFetchRequestRate = metricTypeMap.get(BrokerTopicStats.TotalFetchRequestsPerSec).meter()
def fetchMessageConversionsRate = metricTypeMap.get(BrokerTopicStats.FetchMessageConversionsPerSec).meter()
def produceMessageConversionsRate = metricTypeMap.get(BrokerTopicStats.ProduceMessageConversionsPerSec).meter()
def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
if (meter != null)
meter.close()
}
def close(): Unit = metricTypeMap.values.foreach(_.close())
}
object BrokerTopicStats {
@ -247,13 +276,13 @@ class BrokerTopicStats { @@ -247,13 +276,13 @@ class BrokerTopicStats {
def removeOldLeaderMetrics(topic: String): Unit = {
val topicMetrics = topicStats(topic)
if (topicMetrics != null) {
topicMetrics.removeMetricHelper(BrokerTopicStats.MessagesInPerSec, topicMetrics.tags)
topicMetrics.removeMetricHelper(BrokerTopicStats.BytesInPerSec, topicMetrics.tags)
topicMetrics.removeMetricHelper(BrokerTopicStats.BytesRejectedPerSec, topicMetrics.tags)
topicMetrics.removeMetricHelper(BrokerTopicStats.FailedProduceRequestsPerSec, topicMetrics.tags)
topicMetrics.removeMetricHelper(BrokerTopicStats.TotalProduceRequestsPerSec, topicMetrics.tags)
topicMetrics.removeMetricHelper(BrokerTopicStats.ProduceMessageConversionsPerSec, topicMetrics.tags)
topicMetrics.removeMetricHelper(BrokerTopicStats.ReplicationBytesOutPerSec, topicMetrics.tags)
topicMetrics.closeMetric(BrokerTopicStats.MessagesInPerSec)
topicMetrics.closeMetric(BrokerTopicStats.BytesInPerSec)
topicMetrics.closeMetric(BrokerTopicStats.BytesRejectedPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedProduceRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.TotalProduceRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
}
}
@ -261,7 +290,7 @@ class BrokerTopicStats { @@ -261,7 +290,7 @@ class BrokerTopicStats {
def removeOldFollowerMetrics(topic: String): Unit = {
val topicMetrics = topicStats(topic)
if (topicMetrics != null)
topicMetrics.removeMetricHelper(BrokerTopicStats.ReplicationBytesInPerSec, topicMetrics.tags)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesInPerSec)
}
def removeMetrics(topic: String): Unit = {

2
core/src/main/scala/kafka/utils/Pool.scala

@ -30,6 +30,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { @@ -30,6 +30,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
def put(k: K, v: V): V = pool.put(k, v)
def putAll(map: java.util.Map[K, V]): Unit = pool.putAll(map)
def putIfNotExists(k: K, v: V): V = pool.putIfAbsent(k, v)
/**

27
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

@ -75,6 +75,20 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -75,6 +75,20 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
}
@Test
def testGeneralBrokerTopicMetricsAreGreedilyRegistered(): Unit = {
val topic = "test-broker-topic-metric"
createTopic(topic, 2, 1)
// The broker metrics for all topics should be greedily registered
assertTrue("General topic metrics don't exist", topicMetrics(None).nonEmpty)
assertEquals(servers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size)
// topic metrics should be lazily registered
assertTrue("Topic metrics aren't lazily registered", topicMetricGroups(topic).isEmpty)
TestUtils.generateAndProduceMessages(servers, topic, nMessages)
assertTrue("Topic metrics aren't registered", topicMetricGroups(topic).nonEmpty)
}
@Test
def testWindowsStyleTagNames(): Unit = {
val path = "C:\\windows-path\\kafka-logs"
@ -170,9 +184,18 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -170,9 +184,18 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
.count
}
private def topicMetrics(topic: Option[String]): Set[String] = {
val metricNames = Metrics.defaultRegistry.allMetrics().keySet.asScala.map(_.getMBeanName)
filterByTopicMetricRegex(metricNames, topic)
}
private def topicMetricGroups(topic: String): Set[String] = {
val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$")
val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).keySet.asScala
metricGroups.filter(topicMetricRegex.pattern.matcher(_).matches)
filterByTopicMetricRegex(metricGroups, Some(topic))
}
private def filterByTopicMetricRegex(metrics: Set[String], topic: Option[String]): Set[String] = {
val pattern = (".*BrokerTopicMetrics.*" + topic.map(t => s"($t)$$").getOrElse("")).r.pattern
metrics.filter(pattern.matcher(_).matches())
}
}

Loading…
Cancel
Save