@ -19,12 +19,15 @@ package kafka.log
@@ -19,12 +19,15 @@ package kafka.log
import scala.collection._
import scala.math
import java.util.concurrent.TimeUnit
import java.nio._
import java.util.Date
import java.io.File
import kafka.common._
import kafka.message._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.lang.IllegalStateException
/* *
@ -63,7 +66,8 @@ import java.lang.IllegalStateException
@@ -63,7 +66,8 @@ import java.lang.IllegalStateException
class LogCleaner ( val config : CleanerConfig ,
val logDirs : Array [ File ] ,
val logs : Pool [ TopicAndPartition , Log ] ,
time : Time = SystemTime ) extends Logging {
time : Time = SystemTime ) extends Logging with KafkaMetricsGroup {
/* for managing the state of partitions being cleaned. */
private val cleanerManager = new LogCleanerManager ( logDirs , logs ) ;
@ -71,11 +75,33 @@ class LogCleaner(val config: CleanerConfig,
@@ -71,11 +75,33 @@ class LogCleaner(val config: CleanerConfig,
private val throttler = new Throttler ( desiredRatePerSec = config . maxIoBytesPerSecond ,
checkIntervalMs = 300 ,
throttleDown = true ,
"cleaner-io" ,
"bytes" ,
time = time )
/* the threads */
private val cleaners = ( 0 until config . numThreads ) . map ( new CleanerThread ( _ ) )
/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
newGauge ( "max-buffer-utilization-percent" ,
new Gauge [ Int ] {
def value : Int = cleaners . map ( _ . lastStats ) . map ( 100 * _ . bufferUtilization ) . max . toInt
} )
/* a metric to track the recopy rate of each thread's last cleaning */
newGauge ( "cleaner-recopy-percent" ,
new Gauge [ Int ] {
def value : Int = {
val stats = cleaners . map ( _ . lastStats )
val recopyRate = stats . map ( _ . bytesWritten ) . sum . toDouble / math . max ( stats . map ( _ . bytesRead ) . sum , 1 )
( 100 * recopyRate ) . toInt
}
} )
/* a metric to track the maximum cleaning time for the last cleaning from each thread */
newGauge ( "max-clean-time-secs" ,
new Gauge [ Int ] {
def value : Int = cleaners . map ( _ . lastStats ) . map ( _ . elapsedSecs ) . max . toInt
} )
/* *
* Start the background cleaning
*/
@ -147,6 +173,8 @@ class LogCleaner(val config: CleanerConfig,
@@ -147,6 +173,8 @@ class LogCleaner(val config: CleanerConfig,
time = time ,
checkDone = checkDone )
@volatile var lastStats : CleanerStats = new CleanerStats ( )
private def checkDone ( topicAndPartition : TopicAndPartition ) {
if ( ! isRunning . get ( ) )
throw new ThreadShutdownException
@ -173,7 +201,7 @@ class LogCleaner(val config: CleanerConfig,
@@ -173,7 +201,7 @@ class LogCleaner(val config: CleanerConfig,
var endOffset = cleanable . firstDirtyOffset
try {
endOffset = cleaner . clean ( cleanable )
log Stats( cleaner . id , cleanable . log . name , cleanable . firstDirtyOffset , endOffset , cleaner . stats )
record Stats( cleaner . id , cleanable . log . name , cleanable . firstDirtyOffset , endOffset , cleaner . stats )
} catch {
case pe : LogCleaningAbortedException => // task can be aborted , let it go .
} finally {
@ -185,7 +213,8 @@ class LogCleaner(val config: CleanerConfig,
@@ -185,7 +213,8 @@ class LogCleaner(val config: CleanerConfig,
/* *
* Log out statistics on a single run of the cleaner .
*/
def logStats ( id : Int , name : String , from : Long , to : Long , stats : CleanerStats ) {
def recordStats ( id : Int , name : String , from : Long , to : Long , stats : CleanerStats ) {
this . lastStats = stats
def mb ( bytes : Double ) = bytes / ( 1024 * 1024 )
val message =
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n" . format ( id , name , from , to ) +
@ -196,6 +225,7 @@ class LogCleaner(val config: CleanerConfig,
@@ -196,6 +225,7 @@ class LogCleaner(val config: CleanerConfig,
stats . elapsedIndexSecs ,
mb ( stats . mapBytesRead ) / stats . elapsedIndexSecs ,
100 * stats . elapsedIndexSecs . toDouble / stats . elapsedSecs ) +
"\tBuffer utilization: %.1f%%%n" . format ( 100 * stats . bufferUtilization ) +
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n" . format ( mb ( stats . bytesRead ) ,
stats . elapsedSecs - stats . elapsedIndexSecs ,
mb ( stats . bytesRead ) / ( stats . elapsedSecs - stats . elapsedIndexSecs ) , 100 * ( stats . elapsedSecs - stats . elapsedIndexSecs ) . toDouble / stats . elapsedSecs ) +
@ -218,7 +248,7 @@ class LogCleaner(val config: CleanerConfig,
@@ -218,7 +248,7 @@ class LogCleaner(val config: CleanerConfig,
* @param time The time instance
*/
private [ log ] class Cleaner ( val id : Int ,
offsetMap : OffsetMap ,
val offsetMap : OffsetMap ,
ioBufferSize : Int ,
maxIoBufferSize : Int ,
dupBufferLoadFactor : Double ,
@ -269,6 +299,9 @@ private[log] class Cleaner(val id: Int,
@@ -269,6 +299,9 @@ private[log] class Cleaner(val id: Int,
info ( "Cleaning log %s (discarding tombstones prior to %s)..." . format ( log . name , new Date ( deleteHorizonMs ) ) )
for ( group <- groupSegmentsBySize ( log . logSegments ( 0 , endOffset ) , log . config . segmentSize , log . config . maxIndexSize ) )
cleanSegments ( log , group , offsetMap , deleteHorizonMs )
// record buffer utilization
stats . bufferUtilization = offsetMap . utilization
stats . allDone ( )
endOffset
@ -504,6 +537,7 @@ private[log] class Cleaner(val id: Int,
@@ -504,6 +537,7 @@ private[log] class Cleaner(val id: Int,
*/
private case class CleanerStats ( time : Time = SystemTime ) {
var startTime , mapCompleteTime , endTime , bytesRead , bytesWritten , mapBytesRead , mapMessagesRead , messagesRead , messagesWritten = 0L
var bufferUtilization = 0.0d
clear ( )
def readMessage ( size : Int ) {
@ -543,6 +577,7 @@ private case class CleanerStats(time: Time = SystemTime) {
@@ -543,6 +577,7 @@ private case class CleanerStats(time: Time = SystemTime) {
mapMessagesRead = 0L
messagesRead = 0L
messagesWritten = 0L
bufferUtilization = 0.0d
}
}