diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index c663ddc167b..2f6bf52f67e 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1081,6 +1081,9 @@ class LogManager(logDirs: Seq[File], throw new KafkaStorageException(s"The future replica for $topicPartition is offline") destLog.renameDir(UnifiedLog.logDirName(topicPartition), true) + // the metrics tags still contain "future", so we have to remove it. + // we will add metrics back after sourceLog remove the metrics + destLog.removeLogMetrics() destLog.updateHighWatermark(sourceLog.highWatermark) // Now that future replica has been successfully renamed to be the current replica @@ -1102,6 +1105,7 @@ class LogManager(logDirs: Seq[File], checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) sourceLog.removeLogMetrics() + destLog.newMetrics() addLogToBeDeleted(sourceLog) } catch { case e: KafkaStorageException => diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 718e779c970..c876840182b 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -237,7 +237,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { */ @threadsafe class UnifiedLog(@volatile var logStartOffset: Long, - private val localLog: LocalLog, + private[log] val localLog: LocalLog, brokerTopicStats: BrokerTopicStats, val producerIdExpirationCheckIntervalMs: Int, @volatile var leaderEpochCache: Option[LeaderEpochFileCache], @@ -552,15 +552,23 @@ class UnifiedLog(@volatile var logStartOffset: Long, ) } - private val tags = { - val maybeFutureTag = if (isFuture) Map("is-future" -> "true") else Map.empty[String, String] - Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag - } - newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags) - newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags) - newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags) - newGauge(LogMetricNames.Size, () => size, tags) + private var metricNames: Map[String, Map[String, String]] = Map.empty + + newMetrics() + private[log] def newMetrics(): Unit = { + val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ + (if (isFuture) Map("is-future" -> "true") else Map.empty) + newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags) + newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags) + newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags) + newGauge(LogMetricNames.Size, () => size, tags) + metricNames = Map(LogMetricNames.NumLogSegments -> tags, + LogMetricNames.LogStartOffset -> tags, + LogMetricNames.LogEndOffset -> tags, + LogMetricNames.Size -> tags) + + } val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { @@ -1795,10 +1803,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, * remove deleted log metrics */ private[log] def removeLogMetrics(): Unit = { - removeMetric(LogMetricNames.NumLogSegments, tags) - removeMetric(LogMetricNames.LogStartOffset, tags) - removeMetric(LogMetricNames.LogEndOffset, tags) - removeMetric(LogMetricNames.Size, tags) + metricNames.foreach { + case (name, tags) => removeMetric(name, tags) + } + metricNames = Map.empty } /** diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6e1cdec4110..3c17878ae79 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -920,6 +920,8 @@ class LogManagerTest { // Replace the current log with the future one and verify that only one set of metrics are present logManager.replaceCurrentWithFutureLog(tp) verifyMetrics(1) + // the future log is gone, so we have to make sure the metrics gets gone also. + assertEquals(0, logMetrics.count(m => m.getMBeanName.contains("is-future"))) // Trigger the deletion of the former current directory and verify that one set of metrics is still present time.sleep(logConfig.fileDeleteDelayMs + 1)