Browse Source

MINOR: Add test to ensure log metrics are removed after deletion (#7750)

Reviewers: Jason Gustafson <jason@confluent.io>
pull/7776/head
Dhruvil Shah 5 years ago committed by Jason Gustafson
parent
commit
122941793e
  1. 27
      core/src/main/scala/kafka/log/Log.scala
  2. 21
      core/src/test/scala/unit/kafka/log/LogTest.scala

27
core/src/main/scala/kafka/log/Log.scala

@ -465,25 +465,25 @@ class Log(@volatile var dir: File, @@ -465,25 +465,25 @@ class Log(@volatile var dir: File,
Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag
}
newGauge("NumLogSegments",
newGauge(LogMetricNames.NumLogSegments,
new Gauge[Int] {
def value = numberOfSegments
},
tags)
newGauge("LogStartOffset",
newGauge(LogMetricNames.LogStartOffset,
new Gauge[Long] {
def value = logStartOffset
},
tags)
newGauge("LogEndOffset",
newGauge(LogMetricNames.LogEndOffset,
new Gauge[Long] {
def value = logEndOffset
},
tags)
newGauge("Size",
newGauge(LogMetricNames.Size,
new Gauge[Long] {
def value = size
},
@ -2297,10 +2297,10 @@ class Log(@volatile var dir: File, @@ -2297,10 +2297,10 @@ class Log(@volatile var dir: File,
* remove deleted log metrics
*/
private[log] def removeLogMetrics(): Unit = {
removeMetric("NumLogSegments", tags)
removeMetric("LogStartOffset", tags)
removeMetric("LogEndOffset", tags)
removeMetric("Size", tags)
removeMetric(LogMetricNames.NumLogSegments, tags)
removeMetric(LogMetricNames.LogStartOffset, tags)
removeMetric(LogMetricNames.LogEndOffset, tags)
removeMetric(LogMetricNames.Size, tags)
}
/**
@ -2624,3 +2624,14 @@ object Log { @@ -2624,3 +2624,14 @@ object Log {
file.getPath.endsWith(LogFileSuffix)
}
object LogMetricNames {
val NumLogSegments: String = "NumLogSegments"
val LogStartOffset: String = "LogStartOffset"
val LogEndOffset: String = "LogEndOffset"
val Size: String = "Size"
def allMetricNames: List[String] = {
List(NumLogSegments, LogStartOffset, LogEndOffset, Size)
}
}

21
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -4145,6 +4145,27 @@ class LogTest { @@ -4145,6 +4145,27 @@ class LogTest {
assertEquals(1, log.numberOfSegments)
}
@Test
def testMetricsRemovedOnLogDeletion(): Unit = {
TestUtils.clearYammerMetrics()
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val topicPartition = Log.parseTopicPartitionName(logDir)
val metricTag = s"topic=${topicPartition.topic},partition=${topicPartition.partition}"
val logMetrics = metricsKeySet.filter(_.getType == "Log")
assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size)
logMetrics.foreach { metric =>
assertTrue(metric.getMBeanName.contains(metricTag))
}
// Delete the log and validate that corresponding metrics were removed.
log.delete()
val logMetricsAfterDeletion = metricsKeySet.filter(_.getType == "Log")
assertTrue(logMetricsAfterDeletion.isEmpty)
}
private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {

Loading…
Cancel
Save