diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 26bfbe9e0fe..508dcd0e685 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -440,6 +440,8 @@ class LogManager(logDirs: Seq[File], CoreUtils.swallow(cleaner.shutdown(), this) } + val localLogsByDir = logsByDir + // close logs in each dir for (dir <- liveLogDirs) { debug(s"Flushing and closing logs at $dir") @@ -447,7 +449,7 @@ class LogManager(logDirs: Seq[File], val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool) - val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values + val logsInDir = localLogsByDir.getOrElse(dir.toString, Map()).values val jobsForDir = logsInDir map { log => CoreUtils.runnable { @@ -466,7 +468,7 @@ class LogManager(logDirs: Seq[File], // update the last flush point debug(s"Updating recovery points at $dir") - checkpointLogRecoveryOffsetsInDir(dir) + checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq) debug(s"Updating log start offsets at $dir") checkpointLogStartOffsetsInDir(dir) @@ -495,7 +497,7 @@ class LogManager(logDirs: Seq[File], * @param isFuture True iff the truncation should be performed on the future log of the specified partitions */ def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) { - var truncated = false + val affectedLogs = ArrayBuffer.empty[Log] for ((topicPartition, truncateOffset) <- partitionOffsets) { val log = { if (isFuture) @@ -511,7 +513,7 @@ class LogManager(logDirs: Seq[File], cleaner.abortAndPauseCleaning(topicPartition) try { if (log.truncateTo(truncateOffset)) - truncated = true + affectedLogs += log if (needToStopCleaner && !isFuture) cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) } finally { @@ -523,8 +525,9 @@ class LogManager(logDirs: Seq[File], } } - if (truncated) - checkpointLogRecoveryOffsets() + for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) { + checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs) + } } /** @@ -557,7 +560,7 @@ class LogManager(logDirs: Seq[File], info(s"Compaction for partition $topicPartition is resumed") } } - checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile) + checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log)) } } @@ -566,7 +569,11 @@ class LogManager(logDirs: Seq[File], * to avoid recovering the whole log on startup. */ def checkpointLogRecoveryOffsets() { - liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir) + logsByDir.foreach { case (dir, partitionToLogMap) => + liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f => + checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq) + } + } } /** @@ -578,21 +585,29 @@ class LogManager(logDirs: Seq[File], } /** - * Make a checkpoint for all logs in provided directory. - */ + * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs. + * + * @param dir the directory in which logs are checkpointed + * @param logsToCleanSnapshot logs whose snapshots need to be cleaned + */ + // Only for testing + private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = { + try { + checkpointLogRecoveryOffsetsInDir(dir) + logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) + } catch { + case e: IOException => + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " + + s"file in directory $dir", e) + } + } + private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { for { partitionToLog <- logsByDir.get(dir.getAbsolutePath) checkpoint <- recoveryPointCheckpoints.get(dir) } { - try { - checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) - allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) - } catch { - case e: IOException => - logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " + - s"file in directory $dir", e) - } + checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) } } @@ -802,7 +817,7 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() - checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) + checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) addLogToBeDeleted(sourceLog) } catch { @@ -840,7 +855,7 @@ class LogManager(logDirs: Seq[File], cleaner.updateCheckpoints(removedLog.dir.getParentFile) } removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) + checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty) checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) addLogToBeDeleted(removedLog) info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") @@ -929,9 +944,8 @@ class LogManager(logDirs: Seq[File], * Map of log dir to logs by topic and partitions in that dir */ private def logsByDir: Map[String, Map[TopicPartition, Log]] = { - (this.currentLogs.toList ++ this.futureLogs.toList).groupBy { - case (_, log) => log.dir.getParent - }.mapValues(_.toMap) + (this.currentLogs.toList ++ this.futureLogs.toList).toMap + .groupBy { case (_, log) => log.dir.getParent } } // logDir should be an absolute path diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 2fbb8759441..812dadaff5e 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -375,6 +375,35 @@ class LogManagerTest { assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted) } + @Test + def testCheckpointForOnlyAffectedLogs() { + val tps = Seq( + new TopicPartition("test-a", 0), + new TopicPartition("test-a", 1), + new TopicPartition("test-a", 2), + new TopicPartition("test-b", 0), + new TopicPartition("test-b", 1)) + + val allLogs = tps.map(logManager.getOrCreateLog(_, logConfig)) + allLogs.foreach { log => + for (_ <- 0 until 50) + log.appendAsLeader(TestUtils.singletonRecords("test".getBytes), leaderEpoch = 0) + log.flush() + } + + logManager.checkpointRecoveryOffsetsAndCleanSnapshot(this.logDir, allLogs.filter(_.dir.getName.contains("test-a"))) + + val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read() + + tps.zip(allLogs).foreach { case (tp, log) => + assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + if (tp.topic.equals("test-a")) // should only cleanup old producer snapshots for topic 'test-a' + assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset) + else + assertNotEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset) + } + } + private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = { log.read(offset, maxLength, maxOffset = None, minOneMessage = true, includeAbortedTxns = false) }