Browse Source

KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)

Instead of calling deleteSnapshotsAfterRecoveryPointCheckpoint for allLogs, invoking it only for the logs being truncated.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
pull/5818/head
huxi 6 years ago committed by Jun Rao
parent
commit
3eaf44ba8e
  1. 56
      core/src/main/scala/kafka/log/LogManager.scala
  2. 29
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala

56
core/src/main/scala/kafka/log/LogManager.scala

@ -440,6 +440,8 @@ class LogManager(logDirs: Seq[File], @@ -440,6 +440,8 @@ class LogManager(logDirs: Seq[File],
CoreUtils.swallow(cleaner.shutdown(), this)
}
val localLogsByDir = logsByDir
// close logs in each dir
for (dir <- liveLogDirs) {
debug(s"Flushing and closing logs at $dir")
@ -447,7 +449,7 @@ class LogManager(logDirs: Seq[File], @@ -447,7 +449,7 @@ class LogManager(logDirs: Seq[File],
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
threadPools.append(pool)
val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
val logsInDir = localLogsByDir.getOrElse(dir.toString, Map()).values
val jobsForDir = logsInDir map { log =>
CoreUtils.runnable {
@ -466,7 +468,7 @@ class LogManager(logDirs: Seq[File], @@ -466,7 +468,7 @@ class LogManager(logDirs: Seq[File],
// update the last flush point
debug(s"Updating recovery points at $dir")
checkpointLogRecoveryOffsetsInDir(dir)
checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq)
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir)
@ -495,7 +497,7 @@ class LogManager(logDirs: Seq[File], @@ -495,7 +497,7 @@ class LogManager(logDirs: Seq[File],
* @param isFuture True iff the truncation should be performed on the future log of the specified partitions
*/
def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) {
var truncated = false
val affectedLogs = ArrayBuffer.empty[Log]
for ((topicPartition, truncateOffset) <- partitionOffsets) {
val log = {
if (isFuture)
@ -511,7 +513,7 @@ class LogManager(logDirs: Seq[File], @@ -511,7 +513,7 @@ class LogManager(logDirs: Seq[File],
cleaner.abortAndPauseCleaning(topicPartition)
try {
if (log.truncateTo(truncateOffset))
truncated = true
affectedLogs += log
if (needToStopCleaner && !isFuture)
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
} finally {
@ -523,8 +525,9 @@ class LogManager(logDirs: Seq[File], @@ -523,8 +525,9 @@ class LogManager(logDirs: Seq[File],
}
}
if (truncated)
checkpointLogRecoveryOffsets()
for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) {
checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
}
}
/**
@ -557,7 +560,7 @@ class LogManager(logDirs: Seq[File], @@ -557,7 +560,7 @@ class LogManager(logDirs: Seq[File],
info(s"Compaction for partition $topicPartition is resumed")
}
}
checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log))
}
}
@ -566,7 +569,11 @@ class LogManager(logDirs: Seq[File], @@ -566,7 +569,11 @@ class LogManager(logDirs: Seq[File],
* to avoid recovering the whole log on startup.
*/
def checkpointLogRecoveryOffsets() {
liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir)
logsByDir.foreach { case (dir, partitionToLogMap) =>
liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
}
}
}
/**
@ -578,22 +585,30 @@ class LogManager(logDirs: Seq[File], @@ -578,22 +585,30 @@ class LogManager(logDirs: Seq[File],
}
/**
* Make a checkpoint for all logs in provided directory.
* Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
*
* @param dir the directory in which logs are checkpointed
* @param logsToCleanSnapshot logs whose snapshots need to be cleaned
*/
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir)
} {
// Only for testing
private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
try {
checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
checkpointLogRecoveryOffsetsInDir(dir)
logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
s"file in directory $dir", e)
}
}
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir)
} {
checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
}
}
/**
@ -802,7 +817,7 @@ class LogManager(logDirs: Seq[File], @@ -802,7 +817,7 @@ class LogManager(logDirs: Seq[File],
// Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted.
sourceLog.close()
checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
addLogToBeDeleted(sourceLog)
} catch {
@ -840,7 +855,7 @@ class LogManager(logDirs: Seq[File], @@ -840,7 +855,7 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
@ -929,9 +944,8 @@ class LogManager(logDirs: Seq[File], @@ -929,9 +944,8 @@ class LogManager(logDirs: Seq[File],
* Map of log dir to logs by topic and partitions in that dir
*/
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
(this.currentLogs.toList ++ this.futureLogs.toList).groupBy {
case (_, log) => log.dir.getParent
}.mapValues(_.toMap)
(this.currentLogs.toList ++ this.futureLogs.toList).toMap
.groupBy { case (_, log) => log.dir.getParent }
}
// logDir should be an absolute path

29
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -375,6 +375,35 @@ class LogManagerTest { @@ -375,6 +375,35 @@ class LogManagerTest {
assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
}
@Test
def testCheckpointForOnlyAffectedLogs() {
val tps = Seq(
new TopicPartition("test-a", 0),
new TopicPartition("test-a", 1),
new TopicPartition("test-a", 2),
new TopicPartition("test-b", 0),
new TopicPartition("test-b", 1))
val allLogs = tps.map(logManager.getOrCreateLog(_, logConfig))
allLogs.foreach { log =>
for (_ <- 0 until 50)
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes), leaderEpoch = 0)
log.flush()
}
logManager.checkpointRecoveryOffsetsAndCleanSnapshot(this.logDir, allLogs.filter(_.dir.getName.contains("test-a")))
val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read()
tps.zip(allLogs).foreach { case (tp, log) =>
assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
if (tp.topic.equals("test-a")) // should only cleanup old producer snapshots for topic 'test-a'
assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
else
assertNotEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
}
}
private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
log.read(offset, maxLength, maxOffset = None, minOneMessage = true, includeAbortedTxns = false)
}

Loading…
Cancel
Save