@ -26,61 +26,107 @@ import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig}
@@ -26,61 +26,107 @@ import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig}
/* *
* The guy who creates and hands out logs
* The entry point to the kafka log management subsystem . The log manager is responsible for log creation , retrieval , and cleaning .
* All read and write operations are delegated to the individual log instances .
*
* The log manager maintains logs in one or more directories . New logs are created in the data directory
* with the fewest logs . No attempt is made to move partitions after the fact or balance based on
* size or I / O rate .
*
* A background thread handles log retention by periodically truncating excess log segments .
*/
@threadsafe
private [ kafka ] class LogManager ( val config : KafkaConfig ,
scheduler : KafkaScheduler ,
private val time : Time ,
val logRollDefaultIntervalMs : Long ,
val logCleanupIntervalMs : Long ,
val logCleanupDefaultAgeMs : Long ,
needRecovery : Boolean ) extends Logging {
private val time : Time ) extends Logging {
val logDir : File = new File ( config . logDir )
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
val logDirs : Array [ File ] = config . logDirs . map ( new File ( _ ) ) . toArray
private val logFileSizeMap = config . logFileSizeMap
private val flushInterval = config . flushInterval
private val logCreationLock = new Object
private val logFlushInterval = config . flushInterval
private val logFlushIntervals = config . flushIntervalMap
private val logCreationLock = new Object
private val logRetentionSizeMap = config . logRetentionSizeMap
private val logRetentionMsMap = config . logRetentionHoursMap . map ( e => ( e . _1 , e . _2 * 60 * 60 * 1000L ) ) // convert hours to ms
private val logRollMsMap = config . logRollHoursMap . map ( e => ( e . _1 , e . _2 * 60 * 60 * 1000L ) )
this . logIdent = "[Log Manager on Broker " + config . brokerId + "] "
private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config . logRollHours
private val logCleanupIntervalMs = 1000L * 60 * config . logCleanupIntervalMinutes
private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config . logRetentionHours
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool [ String , Pool [ Int , Log ] ] ( )
if ( ! logDir . exists ( ) ) {
info ( "No log directory found, creating '" + logDir . getAbsolutePath ( ) + "'" )
logDir . mkdirs ( )
}
if ( ! logDir . isDirectory ( ) || ! logDir . canRead ( ) )
throw new KafkaException ( logDir . getAbsolutePath ( ) + " is not a readable log directory." )
val subDirs = logDir . listFiles ( )
if ( subDirs != null ) {
for ( dir <- subDirs ) {
if ( dir . getName . equals ( HighwaterMarkCheckpoint . highWatermarkFileName ) ) {
// skip valid metadata file
this . logIdent = "[Log Manager on Broker " + config . brokerId + "] "
private val logs = new Pool [ TopicAndPartition , Log ] ( )
createAndValidateLogDirs ( logDirs )
private var dirLocks = lockLogDirs ( logDirs )
loadLogs ( logDirs )
/* *
* 1. Ensure that there are no duplicates in the directory list
* 2. Create each directory if it doesn 't exist
* 3. Check that each path is a readable directory
*/
private def createAndValidateLogDirs ( dirs : Seq [ File ] ) {
if ( dirs . map ( _ . getCanonicalPath ) . toSet . size < dirs . size )
throw new KafkaException ( "Duplicate log directory found: " + logDirs . mkString ( ", " ) )
for ( dir <- dirs ) {
if ( ! dir . exists ) {
info ( "Log directory '" + dir . getAbsolutePath + "' not found, creating it." )
val created = dir . mkdirs ( )
if ( ! created )
throw new KafkaException ( "Failed to create data directory " + dir . getAbsolutePath )
}
else if ( ! dir . isDirectory ( ) ) {
warn ( "Skipping unexplainable file '" + dir . getAbsolutePath ( ) + "'--should it be there?" )
} else {
info ( "Loading log '" + dir . getName ( ) + "'" )
val topicPartition = parseTopicPartitionName ( dir . getName )
val rollIntervalMs = logRollMsMap . get ( topicPartition . topic ) . getOrElse ( this . logRollDefaultIntervalMs )
val maxLogFileSize = logFileSizeMap . get ( topicPartition . topic ) . getOrElse ( config . logFileSize )
val log = new Log ( dir ,
maxLogFileSize ,
config . maxMessageSize ,
flushInterval ,
rollIntervalMs ,
needRecovery ,
config . logIndexMaxSizeBytes ,
config . logIndexIntervalBytes ,
time ,
config . brokerId )
logs . putIfNotExists ( topicPartition . topic , new Pool [ Int , Log ] ( ) )
val parts = logs . get ( topicPartition . topic )
parts . put ( topicPartition . partition , log )
if ( ! dir . isDirectory || ! dir . canRead )
throw new KafkaException ( dir . getAbsolutePath + " is not a readable log directory." )
}
}
/* *
* Lock all the given directories
*/
private def lockLogDirs ( dirs : Seq [ File ] ) : Seq [ FileLock ] = {
dirs . map { dir =>
val lock = new FileLock ( new File ( dir , LockFile ) )
if ( ! lock . tryLock ( ) )
throw new KafkaException ( "Failed to acquire lock on file .lock in " + lock . file . getParentFile . getAbsolutePath +
". A Kafka instance in another process or thread is using this directory." )
lock
}
}
/* *
* Recovery and load all logs in the given data directories
*/
private def loadLogs ( dirs : Seq [ File ] ) {
for ( dir <- dirs ) {
/* check if this set of logs was shut down cleanly */
val cleanShutDownFile = new File ( dir , CleanShutdownFile )
val needsRecovery = cleanShutDownFile . exists
cleanShutDownFile . delete
/* load the logs */
val subDirs = dir . listFiles ( )
if ( subDirs != null ) {
for ( dir <- subDirs ) {
if ( dir . isDirectory ) {
info ( "Loading log '" + dir . getName + "'" )
val topicPartition = parseTopicPartitionName ( dir . getName )
val rollIntervalMs = logRollMsMap . get ( topicPartition . topic ) . getOrElse ( this . logRollDefaultIntervalMs )
val maxLogFileSize = logFileSizeMap . get ( topicPartition . topic ) . getOrElse ( config . logFileSize )
val log = new Log ( dir ,
maxLogFileSize ,
config . maxMessageSize ,
logFlushInterval ,
rollIntervalMs ,
needsRecovery ,
config . logIndexMaxSizeBytes ,
config . logIndexIntervalBytes ,
time ,
config . brokerId )
val previous = this . logs . put ( topicPartition , log )
if ( previous != null )
throw new IllegalArgumentException ( "Duplicate log directories found: %s, %s!" . format ( log . dir . getAbsolutePath , previous . dir . getAbsolutePath ) )
}
}
}
}
}
@ -99,18 +145,81 @@ private[kafka] class LogManager(val config: KafkaConfig,
@@ -99,18 +145,81 @@ private[kafka] class LogManager(val config: KafkaConfig,
config . flushSchedulerThreadRate , config . flushSchedulerThreadRate , false )
}
}
/* *
* Get the log if it exists
*/
def getLog ( topic : String , partition : Int ) : Option [ Log ] = {
val topicAndPartiton = TopicAndPartition ( topic , partition )
val log = logs . get ( topicAndPartiton )
if ( log == null )
None
else
Some ( log )
}
/* *
* Create the log if it does not exist , if it exists just return it
*/
def getOrCreateLog ( topic : String , partition : Int ) : Log = {
val topicAndPartition = TopicAndPartition ( topic , partition )
logs . get ( topicAndPartition ) match {
case null => createLogIfNotExists ( topicAndPartition )
case log : Log => log
}
}
/* *
* Create a log for the given topic and the given partition
* If the log already exists , just return a copy of the existing log
*/
private def createLog ( topic : String , partition : Int ) : Log = {
private def createLogIfNotExists ( topicAndPartition : TopicAndPartition ) : Log = {
logCreationLock synchronized {
val d = new File ( logDir , topic + "-" + partition )
d . mkdirs ( )
val rollIntervalMs = logRollMsMap . get ( topic ) . getOrElse ( this . logRollDefaultIntervalMs )
val maxLogFileSize = logFileSizeMap . get ( topic ) . getOrElse ( config . logFileSize )
new Log ( d , maxLogFileSize , config . maxMessageSize , flushInterval , rollIntervalMs , needsRecovery = false , config . logIndexMaxSizeBytes , config . logIndexIntervalBytes , time , config . brokerId )
var log = logs . get ( topicAndPartition )
// check if the log has already been created in another thread
if ( log != null )
return log
// if not , create it
val dataDir = nextLogDir ( )
val dir = new File ( dataDir , topicAndPartition . topic + "-" + topicAndPartition . partition )
dir . mkdirs ( )
val rollIntervalMs = logRollMsMap . get ( topicAndPartition . topic ) . getOrElse ( this . logRollDefaultIntervalMs )
val maxLogFileSize = logFileSizeMap . get ( topicAndPartition . topic ) . getOrElse ( config . logFileSize )
log = new Log ( dir ,
maxLogFileSize ,
config . maxMessageSize ,
logFlushInterval ,
rollIntervalMs ,
needsRecovery = false ,
config . logIndexMaxSizeBytes ,
config . logIndexIntervalBytes ,
time ,
config . brokerId )
info ( "Created log for topic %s partition %d in %s." . format ( topicAndPartition . topic , topicAndPartition . partition , dataDir . getAbsolutePath ) )
logs . put ( topicAndPartition , log )
log
}
}
/* *
* Choose the next directory in which to create a log . Currently this is done
* by calculating the number of partitions in each directory and then choosing the
* data directory with the fewest partitions .
*/
private def nextLogDir ( ) : File = {
if ( logDirs . size == 1 ) {
logDirs ( 0 )
} else {
// count the number of logs in each parent directory ( including 0 for empty directories
val logCounts = allLogs . groupBy ( _ . dir . getParent ) . mapValues ( _ . size )
val zeros = logDirs . map ( dir => ( dir . getPath , 0 ) ) . toMap
var dirCounts = ( zeros ++ logCounts ) . toBuffer
// choose the directory with the least logs in it
val leastLoaded = dirCounts . sortBy ( _ . _2 ) . head
new File ( leastLoaded . _1 )
}
}
@ -123,48 +232,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
@@ -123,48 +232,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
/* *
* Get the log if it exists
*/
def getLog ( topic : String , partition : Int ) : Option [ Log ] = {
val parts = logs . get ( topic )
if ( parts == null ) None
else {
val log = parts . get ( partition )
if ( log == null ) None
else Some ( log )
}
}
/* *
* Create the log if it does not exist , if it exists just return it
* Runs through the log removing segments older than a certain age
*/
def getOrCreateLog ( topic : String , partition : Int ) : Log = {
var hasNewTopic = false
var parts = logs . get ( topic )
if ( parts == null ) {
val found = logs . putIfNotExists ( topic , new Pool [ Int , Log ] )
if ( found == null )
hasNewTopic = true
parts = logs . get ( topic )
}
var log = parts . get ( partition )
if ( log == null ) {
// check if this broker hosts this partition
log = createLog ( topic , partition )
val found = parts . putIfNotExists ( partition , log )
if ( found != null ) {
// there was already somebody there
log . close ( )
log = found
}
else
info ( "Created log for '" + topic + "'-" + partition )
}
log
}
/* Runs through the log removing segments older than a certain age */
private def cleanupExpiredSegments ( log : Log ) : Int = {
val startMs = time . milliseconds
val topic = parseTopicPartitionName ( log . name ) . topic
@ -216,14 +285,22 @@ private[kafka] class LogManager(val config: KafkaConfig,
@@ -216,14 +285,22 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
def shutdown ( ) {
debug ( "Shutting down." )
allLogs . foreach ( _ . close ( ) )
try {
// close the logs
allLogs . foreach ( _ . close ( ) )
// mark that the shutdown was clean by creating the clean shutdown marker file
logDirs . foreach ( dir => Utils . swallow ( new File ( dir , CleanShutdownFile ) . createNewFile ( ) ) )
} finally {
// regardless of whether the close succeeded , we need to unlock the data directories
dirLocks . foreach ( _ . destroy ( ) )
}
debug ( "Shutdown complete." )
}
/* *
* Get all the partition logs
*/
def allLogs ( ) = logs . values . flatMap ( _ . values )
def allLogs ( ) : Iterable [ Log ] = logs . values
/* *
* Flush any log which has exceeded its flush interval and has unwritten messages .
@ -253,12 +330,11 @@ private[kafka] class LogManager(val config: KafkaConfig,
@@ -253,12 +330,11 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
}
def topics ( ) : Iterable [ String ] = logs . keys
private def parseTopicPartitionName ( name : String ) : TopicAndPartition = {
val index = name . lastIndexOf ( '-' )
TopicAndPartition ( name . substring ( 0 , index ) , name . substring ( index + 1 ) . toInt )
}
def topics ( ) : Iterable [ String ] = logs . keys . map ( _ . topic )
}