Browse Source

KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log (#12979)

Reviewers: Luke Chen <showuon@gmail.com>
pull/13047/head
Chia-Ping Tsai 2 years ago committed by GitHub
parent
commit
13d1c086f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/src/main/scala/kafka/log/LogManager.scala
  2. 34
      core/src/main/scala/kafka/log/UnifiedLog.scala
  3. 2
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala

4
core/src/main/scala/kafka/log/LogManager.scala

@ -1081,6 +1081,9 @@ class LogManager(logDirs: Seq[File], @@ -1081,6 +1081,9 @@ class LogManager(logDirs: Seq[File],
throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
// the metrics tags still contain "future", so we have to remove it.
// we will add metrics back after sourceLog remove the metrics
destLog.removeLogMetrics()
destLog.updateHighWatermark(sourceLog.highWatermark)
// Now that future replica has been successfully renamed to be the current replica
@ -1102,6 +1105,7 @@ class LogManager(logDirs: Seq[File], @@ -1102,6 +1105,7 @@ class LogManager(logDirs: Seq[File],
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
sourceLog.removeLogMetrics()
destLog.newMetrics()
addLogToBeDeleted(sourceLog)
} catch {
case e: KafkaStorageException =>

34
core/src/main/scala/kafka/log/UnifiedLog.scala

@ -237,7 +237,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { @@ -237,7 +237,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
*/
@threadsafe
class UnifiedLog(@volatile var logStartOffset: Long,
private val localLog: LocalLog,
private[log] val localLog: LocalLog,
brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
@ -552,15 +552,23 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -552,15 +552,23 @@ class UnifiedLog(@volatile var logStartOffset: Long,
)
}
private val tags = {
val maybeFutureTag = if (isFuture) Map("is-future" -> "true") else Map.empty[String, String]
Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag
}
newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
newGauge(LogMetricNames.Size, () => size, tags)
private var metricNames: Map[String, Map[String, String]] = Map.empty
newMetrics()
private[log] def newMetrics(): Unit = {
val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++
(if (isFuture) Map("is-future" -> "true") else Map.empty)
newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
newGauge(LogMetricNames.Size, () => size, tags)
metricNames = Map(LogMetricNames.NumLogSegments -> tags,
LogMetricNames.LogStartOffset -> tags,
LogMetricNames.LogEndOffset -> tags,
LogMetricNames.Size -> tags)
}
val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
@ -1795,10 +1803,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1795,10 +1803,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* remove deleted log metrics
*/
private[log] def removeLogMetrics(): Unit = {
removeMetric(LogMetricNames.NumLogSegments, tags)
removeMetric(LogMetricNames.LogStartOffset, tags)
removeMetric(LogMetricNames.LogEndOffset, tags)
removeMetric(LogMetricNames.Size, tags)
metricNames.foreach {
case (name, tags) => removeMetric(name, tags)
}
metricNames = Map.empty
}
/**

2
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -920,6 +920,8 @@ class LogManagerTest { @@ -920,6 +920,8 @@ class LogManagerTest {
// Replace the current log with the future one and verify that only one set of metrics are present
logManager.replaceCurrentWithFutureLog(tp)
verifyMetrics(1)
// the future log is gone, so we have to make sure the metrics gets gone also.
assertEquals(0, logMetrics.count(m => m.getMBeanName.contains("is-future")))
// Trigger the deletion of the former current directory and verify that one set of metrics is still present
time.sleep(logConfig.fileDeleteDelayMs + 1)

Loading…
Cancel
Save