diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f6999d30d8d..27962f65f89 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.utils.{KafkaThread, Time} import scala.collection.mutable +import scala.collection.JavaConverters._ /** * A thread that answers kafka requests. @@ -146,64 +147,92 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { case Some(topic) => Map("topic" -> topic) } + case class MeterWrapper(metricType: String, eventType: String) { + @volatile private var lazyMeter: Meter = _ + private val meterLock = new Object + + def meter(): Meter = { + var meter = lazyMeter + if (meter == null) { + meterLock synchronized { + meter = lazyMeter + if (meter == null) { + meter = newMeter(metricType, eventType, TimeUnit.SECONDS, tags) + lazyMeter = meter + } + } + } + meter + } + + def close(): Unit = meterLock synchronized { + if (lazyMeter != null) { + removeMetric(metricType, tags) + lazyMeter = null + } + } + + if (tags.isEmpty) // greedily initialize the general topic metrics + meter() + } + // an internal map for "lazy initialization" of certain metrics - private val metricTypeMap = new Pool[String, Meter] - - def messagesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.MessagesInPerSec, - newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS, tags)) - def bytesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesInPerSec, - newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)) - def bytesOutRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesOutPerSec, - newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)) - def bytesRejectedRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesRejectedPerSec, - newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags)) + private val metricTypeMap = new Pool[String, MeterWrapper]() + metricTypeMap.putAll(Map( + BrokerTopicStats.MessagesInPerSec -> MeterWrapper(BrokerTopicStats.MessagesInPerSec, "messages"), + BrokerTopicStats.BytesInPerSec -> MeterWrapper(BrokerTopicStats.BytesInPerSec, "bytes"), + BrokerTopicStats.BytesOutPerSec -> MeterWrapper(BrokerTopicStats.BytesOutPerSec, "bytes"), + BrokerTopicStats.BytesRejectedPerSec -> MeterWrapper(BrokerTopicStats.BytesRejectedPerSec, "bytes"), + BrokerTopicStats.FailedProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedProduceRequestsPerSec, "requests"), + BrokerTopicStats.FailedFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedFetchRequestsPerSec, "requests"), + BrokerTopicStats.TotalProduceRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalProduceRequestsPerSec, "requests"), + BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), + BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), + BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests") + ).asJava) + if (name.isEmpty) { + metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes")) + metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes")) + } + + // used for testing only + def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap + + def messagesInRate = metricTypeMap.get(BrokerTopicStats.MessagesInPerSec).meter() + + def bytesInRate = metricTypeMap.get(BrokerTopicStats.BytesInPerSec).meter() + + def bytesOutRate = metricTypeMap.get(BrokerTopicStats.BytesOutPerSec).meter() + + def bytesRejectedRate = metricTypeMap.get(BrokerTopicStats.BytesRejectedPerSec).meter() + private[server] def replicationBytesInRate = - if (name.isEmpty) Some(metricTypeMap.getAndMaybePut( - BrokerTopicStats.ReplicationBytesInPerSec, - newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags))) + if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesInPerSec).meter()) else None + private[server] def replicationBytesOutRate = - if (name.isEmpty) Some(metricTypeMap.getAndMaybePut( - BrokerTopicStats.ReplicationBytesOutPerSec, - newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))) + if (name.isEmpty) Some(metricTypeMap.get(BrokerTopicStats.ReplicationBytesOutPerSec).meter()) else None - def failedProduceRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FailedProduceRequestsPerSec, - newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)) - def failedFetchRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FailedFetchRequestsPerSec, - newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)) - def totalProduceRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.TotalProduceRequestsPerSec, - newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)) - def totalFetchRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.TotalFetchRequestsPerSec, - newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)) - def fetchMessageConversionsRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FetchMessageConversionsPerSec, - newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags)) - def produceMessageConversionsRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.ProduceMessageConversionsPerSec, - newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags)) - - // this method helps check with metricTypeMap first before deleting a metric - def removeMetricHelper(metricType: String, tags: scala.collection.Map[String, String]): Unit = { - val metric: Meter = metricTypeMap.remove(metricType) - if (metric != null) { - removeMetric(metricType, tags) - } - } - def close(): Unit = { - removeMetricHelper(BrokerTopicStats.MessagesInPerSec, tags) - removeMetricHelper(BrokerTopicStats.BytesInPerSec, tags) - removeMetricHelper(BrokerTopicStats.BytesOutPerSec, tags) - removeMetricHelper(BrokerTopicStats.BytesRejectedPerSec, tags) - if (replicationBytesInRate.isDefined) - removeMetricHelper(BrokerTopicStats.ReplicationBytesInPerSec, tags) - if (replicationBytesOutRate.isDefined) - removeMetricHelper(BrokerTopicStats.ReplicationBytesOutPerSec, tags) - removeMetricHelper(BrokerTopicStats.FailedProduceRequestsPerSec, tags) - removeMetricHelper(BrokerTopicStats.FailedFetchRequestsPerSec, tags) - removeMetricHelper(BrokerTopicStats.TotalProduceRequestsPerSec, tags) - removeMetricHelper(BrokerTopicStats.TotalFetchRequestsPerSec, tags) - removeMetricHelper(BrokerTopicStats.FetchMessageConversionsPerSec, tags) - removeMetricHelper(BrokerTopicStats.ProduceMessageConversionsPerSec, tags) + def failedProduceRequestRate = metricTypeMap.get(BrokerTopicStats.FailedProduceRequestsPerSec).meter() + + def failedFetchRequestRate = metricTypeMap.get(BrokerTopicStats.FailedFetchRequestsPerSec).meter() + + def totalProduceRequestRate = metricTypeMap.get(BrokerTopicStats.TotalProduceRequestsPerSec).meter() + + def totalFetchRequestRate = metricTypeMap.get(BrokerTopicStats.TotalFetchRequestsPerSec).meter() + + def fetchMessageConversionsRate = metricTypeMap.get(BrokerTopicStats.FetchMessageConversionsPerSec).meter() + + def produceMessageConversionsRate = metricTypeMap.get(BrokerTopicStats.ProduceMessageConversionsPerSec).meter() + + def closeMetric(metricType: String): Unit = { + val meter = metricTypeMap.get(metricType) + if (meter != null) + meter.close() } + + def close(): Unit = metricTypeMap.values.foreach(_.close()) } object BrokerTopicStats { @@ -247,13 +276,13 @@ class BrokerTopicStats { def removeOldLeaderMetrics(topic: String): Unit = { val topicMetrics = topicStats(topic) if (topicMetrics != null) { - topicMetrics.removeMetricHelper(BrokerTopicStats.MessagesInPerSec, topicMetrics.tags) - topicMetrics.removeMetricHelper(BrokerTopicStats.BytesInPerSec, topicMetrics.tags) - topicMetrics.removeMetricHelper(BrokerTopicStats.BytesRejectedPerSec, topicMetrics.tags) - topicMetrics.removeMetricHelper(BrokerTopicStats.FailedProduceRequestsPerSec, topicMetrics.tags) - topicMetrics.removeMetricHelper(BrokerTopicStats.TotalProduceRequestsPerSec, topicMetrics.tags) - topicMetrics.removeMetricHelper(BrokerTopicStats.ProduceMessageConversionsPerSec, topicMetrics.tags) - topicMetrics.removeMetricHelper(BrokerTopicStats.ReplicationBytesOutPerSec, topicMetrics.tags) + topicMetrics.closeMetric(BrokerTopicStats.MessagesInPerSec) + topicMetrics.closeMetric(BrokerTopicStats.BytesInPerSec) + topicMetrics.closeMetric(BrokerTopicStats.BytesRejectedPerSec) + topicMetrics.closeMetric(BrokerTopicStats.FailedProduceRequestsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.TotalProduceRequestsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec) } } @@ -261,7 +290,7 @@ class BrokerTopicStats { def removeOldFollowerMetrics(topic: String): Unit = { val topicMetrics = topicStats(topic) if (topicMetrics != null) - topicMetrics.removeMetricHelper(BrokerTopicStats.ReplicationBytesInPerSec, topicMetrics.tags) + topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesInPerSec) } def removeMetrics(topic: String): Unit = { diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index c963524a9c8..9a81ce3f9c5 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -27,9 +27,11 @@ import collection.JavaConverters._ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V] - + def put(k: K, v: V): V = pool.put(k, v) - + + def putAll(map: java.util.Map[K, V]): Unit = pool.putAll(map) + def putIfNotExists(k: K, v: V): V = pool.putIfAbsent(k, v) /** diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 71226b4b8cb..7a3b3a9ac52 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -75,6 +75,20 @@ class MetricsTest extends KafkaServerTestHarness with Logging { assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1) } + @Test + def testGeneralBrokerTopicMetricsAreGreedilyRegistered(): Unit = { + val topic = "test-broker-topic-metric" + createTopic(topic, 2, 1) + + // The broker metrics for all topics should be greedily registered + assertTrue("General topic metrics don't exist", topicMetrics(None).nonEmpty) + assertEquals(servers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size) + // topic metrics should be lazily registered + assertTrue("Topic metrics aren't lazily registered", topicMetricGroups(topic).isEmpty) + TestUtils.generateAndProduceMessages(servers, topic, nMessages) + assertTrue("Topic metrics aren't registered", topicMetricGroups(topic).nonEmpty) + } + @Test def testWindowsStyleTagNames(): Unit = { val path = "C:\\windows-path\\kafka-logs" @@ -170,9 +184,18 @@ class MetricsTest extends KafkaServerTestHarness with Logging { .count } + private def topicMetrics(topic: Option[String]): Set[String] = { + val metricNames = Metrics.defaultRegistry.allMetrics().keySet.asScala.map(_.getMBeanName) + filterByTopicMetricRegex(metricNames, topic) + } + private def topicMetricGroups(topic: String): Set[String] = { - val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$") val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).keySet.asScala - metricGroups.filter(topicMetricRegex.pattern.matcher(_).matches) + filterByTopicMetricRegex(metricGroups, Some(topic)) + } + + private def filterByTopicMetricRegex(metrics: Set[String], topic: Option[String]): Set[String] = { + val pattern = (".*BrokerTopicMetrics.*" + topic.map(t => s"($t)$$").getOrElse("")).r.pattern + metrics.filter(pattern.matcher(_).matches()) } }