Browse Source

KAFKA-1327; Log cleaner metrics follow-up patch to reset dirtiest log

cleanable ratio; reviewed by Jun Rao
pull/25/head
Joel Koshy 11 years ago
parent
commit
874620d965
  1. 10
      core/src/main/scala/kafka/log/LogCleaner.scala
  2. 20
      core/src/main/scala/kafka/log/LogCleanerManager.scala

10
core/src/main/scala/kafka/log/LogCleaner.scala

@ -19,7 +19,6 @@ package kafka.log @@ -19,7 +19,6 @@ package kafka.log
import scala.collection._
import scala.math
import java.util.concurrent.TimeUnit
import java.nio._
import java.util.Date
import java.io.File
@ -215,6 +214,7 @@ class LogCleaner(val config: CleanerConfig, @@ -215,6 +214,7 @@ class LogCleaner(val config: CleanerConfig,
*/
def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
this.lastStats = stats
cleaner.statsUnderlying.swap
def mb(bytes: Double) = bytes / (1024*1024)
val message =
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
@ -260,9 +260,10 @@ private[log] class Cleaner(val id: Int, @@ -260,9 +260,10 @@ private[log] class Cleaner(val id: Int,
this.logIdent = "Cleaner " + id + ": "
/* stats on this cleaning */
val stats = new CleanerStats(time)
/* cleaning stats - one instance for the current (or next) cleaning cycle and one for the last completed cycle */
val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
def stats = statsUnderlying._1
/* buffer used for read i/o */
private var readBuffer = ByteBuffer.allocate(ioBufferSize)
@ -304,6 +305,7 @@ private[log] class Cleaner(val id: Int, @@ -304,6 +305,7 @@ private[log] class Cleaner(val id: Int,
stats.bufferUtilization = offsetMap.utilization
stats.allDone()
endOffset
}

20
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
private val pausedCleaningCond = lock.newCondition()
/* a gauge for tracking the cleanable ratio of the dirtiest log */
private var dirtiestLogCleanableRatio = 0.0
@volatile private var dirtiestLogCleanableRatio = 0.0
newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
/**
@ -79,9 +79,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -79,9 +79,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
.filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
.map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each
lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
.filter(l => l.totalBytes > 0) // skip any empty logs
if(!dirtyLogs.isEmpty)
this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio
.filter(l => l.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
if(cleanableLogs.isEmpty) {
None
@ -126,7 +125,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -126,7 +125,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
case LogCleaningInProgress =>
inProgress.put(topicAndPartition, LogCleaningAborted)
case s =>
throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s))
throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
.format(topicAndPartition, s))
}
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
@ -142,17 +142,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -142,17 +142,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition))
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
.format(topicAndPartition))
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicAndPartition)
case s =>
throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s))
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
.format(topicAndPartition, s))
}
}
}
info("The cleaning for partition %s is resumed".format(topicAndPartition))
info("Compaction for partition %s is resumed".format(topicAndPartition))
}
/**
@ -194,7 +196,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -194,7 +196,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inProgress.put(topicAndPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s))
throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
}
}
}

Loading…
Cancel
Save