From 122941793ee932afccb1bd73d0c341e5946f1f5e Mon Sep 17 00:00:00 2001 From: Dhruvil Shah Date: Tue, 3 Dec 2019 18:04:53 -0800 Subject: [PATCH] MINOR: Add test to ensure log metrics are removed after deletion (#7750) Reviewers: Jason Gustafson --- core/src/main/scala/kafka/log/Log.scala | 27 +++++++++++++------ .../test/scala/unit/kafka/log/LogTest.scala | 21 +++++++++++++++ 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6514aa2038c..06f3cb9b3fe 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -465,25 +465,25 @@ class Log(@volatile var dir: File, Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag } - newGauge("NumLogSegments", + newGauge(LogMetricNames.NumLogSegments, new Gauge[Int] { def value = numberOfSegments }, tags) - newGauge("LogStartOffset", + newGauge(LogMetricNames.LogStartOffset, new Gauge[Long] { def value = logStartOffset }, tags) - newGauge("LogEndOffset", + newGauge(LogMetricNames.LogEndOffset, new Gauge[Long] { def value = logEndOffset }, tags) - newGauge("Size", + newGauge(LogMetricNames.Size, new Gauge[Long] { def value = size }, @@ -2297,10 +2297,10 @@ class Log(@volatile var dir: File, * remove deleted log metrics */ private[log] def removeLogMetrics(): Unit = { - removeMetric("NumLogSegments", tags) - removeMetric("LogStartOffset", tags) - removeMetric("LogEndOffset", tags) - removeMetric("Size", tags) + removeMetric(LogMetricNames.NumLogSegments, tags) + removeMetric(LogMetricNames.LogStartOffset, tags) + removeMetric(LogMetricNames.LogEndOffset, tags) + removeMetric(LogMetricNames.Size, tags) } /** @@ -2624,3 +2624,14 @@ object Log { file.getPath.endsWith(LogFileSuffix) } + +object LogMetricNames { + val NumLogSegments: String = "NumLogSegments" + val LogStartOffset: String = "LogStartOffset" + val LogEndOffset: String = "LogEndOffset" + val Size: String = "Size" + + def allMetricNames: List[String] = { + List(NumLogSegments, LogStartOffset, LogEndOffset, Size) + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 25b213b3ab5..99e76dca29f 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -4145,6 +4145,27 @@ class LogTest { assertEquals(1, log.numberOfSegments) } + @Test + def testMetricsRemovedOnLogDeletion(): Unit = { + TestUtils.clearYammerMetrics() + + val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024) + val log = createLog(logDir, logConfig) + val topicPartition = Log.parseTopicPartitionName(logDir) + val metricTag = s"topic=${topicPartition.topic},partition=${topicPartition.partition}" + + val logMetrics = metricsKeySet.filter(_.getType == "Log") + assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size) + logMetrics.foreach { metric => + assertTrue(metric.getMBeanName.contains(metricTag)) + } + + // Delete the log and validate that corresponding metrics were removed. + log.delete() + val logMetricsAfterDeletion = metricsKeySet.filter(_.getType == "Log") + assertTrue(logMetricsAfterDeletion.isEmpty) + } + private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {