diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 680fa94e33e..7a96d8f4d80 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -37,16 +37,24 @@ import scala.collection.{Iterable, immutable, mutable} private[log] sealed trait LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState private[log] case object LogCleaningAborted extends LogCleaningState -private[log] case object LogCleaningPaused extends LogCleaningState +private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState /** - * Manage the state of each partition being cleaned. - * If a partition is to be cleaned, it enters the LogCleaningInProgress state. - * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters - * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state. - * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is - * requested to be resumed. - */ + * This class manages the state of each partition being cleaned. + * LogCleaningState defines the cleaning states that a TopicPartition can be in. + * 1. None : No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress + * or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1) + * 2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished + * or become LogCleaningAborted. Valid previous state is None. + * 3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1). + * Valid previous state is LogCleaningInProgress. + * 4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state. + * In this state, it can become None or LogCleaningPaused(2). + * Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2). + * 4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state. + * In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + * Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + */ private[log] class LogCleanerManager(val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { @@ -164,7 +172,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } deletableLogs.foreach { - case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused) + case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused(1)) } deletableLogs } @@ -207,22 +215,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], * throws a LogCleaningAbortedException to stop the cleaning task. * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused. * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused. + * 6. If the partition is already paused, a new call to this function + * will increase the paused count by one. */ def abortAndPauseCleaning(topicPartition: TopicPartition) { inLock(lock) { inProgress.get(topicPartition) match { case None => - inProgress.put(topicPartition, LogCleaningPaused) - case Some(state) => - state match { - case LogCleaningInProgress => - inProgress.put(topicPartition, LogCleaningAborted) - case LogCleaningPaused => - case s => - throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.") - } + inProgress.put(topicPartition, LogCleaningPaused(1)) + case Some(LogCleaningInProgress) => + inProgress.put(topicPartition, LogCleaningAborted) + case Some(LogCleaningPaused(count)) => + inProgress.put(topicPartition, LogCleaningPaused(count + 1)) + case Some(s) => + throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.") } - while (!isCleaningInState(topicPartition, LogCleaningPaused)) + + while(!isCleaningInStatePaused(topicPartition)) pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) } info(s"The cleaning for partition $topicPartition is aborted and paused") @@ -230,6 +239,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /** * Resume the cleaning of paused partitions. + * Each call of this function will undo one pause. */ def resumeCleaning(topicPartitions: Iterable[TopicPartition]){ inLock(lock) { @@ -240,8 +250,10 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.") case Some(state) => state match { - case LogCleaningPaused => + case LogCleaningPaused(count) if count == 1 => inProgress.remove(topicPartition) + case LogCleaningPaused(count) if count > 1 => + inProgress.put(topicPartition, LogCleaningPaused(count - 1)) case s => throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.") } @@ -264,6 +276,22 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } + /** + * Check if the cleaning for a partition is paused. The caller is expected to hold lock while making the call. + */ + private def isCleaningInStatePaused(topicPartition: TopicPartition): Boolean = { + inProgress.get(topicPartition) match { + case None => false + case Some(state) => + state match { + case LogCleaningPaused(s) => + true + case _ => + false + } + } + } + /** * Check if the cleaning for a partition is aborted. If so, throw an exception. */ @@ -337,7 +365,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], updateCheckpoints(dataDir, Option(topicPartition, endOffset)) inProgress.remove(topicPartition) case Some(LogCleaningAborted) => - inProgress.put(topicPartition, LogCleaningPaused) + inProgress.put(topicPartition, LogCleaningPaused(1)) pausedCleaningCond.signalAll() case None => throw new IllegalStateException(s"State for partition $topicPartition should exist.") @@ -355,7 +383,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case Some(LogCleaningInProgress) => inProgress.remove(topicPartition) case Some(LogCleaningAborted) => - inProgress.put(topicPartition, LogCleaningPaused) + inProgress.put(topicPartition, LogCleaningPaused(1)) pausedCleaningCond.signalAll() case None => throw new IllegalStateException(s"State for partition $topicPartition should exist.") diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index eab85098474..bcf380154a4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -546,11 +546,16 @@ class LogManager(logDirs: Seq[File], //Abort and pause the cleaning of the log, and resume after truncation is done. if (cleaner != null && !isFuture) cleaner.abortAndPauseCleaning(topicPartition) - log.truncateFullyAndStartAt(newOffset) - if (cleaner != null && !isFuture) { - cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) - cleaner.resumeCleaning(Seq(topicPartition)) - info(s"Compaction for partition $topicPartition is resumed") + try { + log.truncateFullyAndStartAt(newOffset) + if (cleaner != null && !isFuture) { + cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) + } + } finally { + if (cleaner != null && !isFuture) { + cleaner.resumeCleaning(Seq(topicPartition)) + info(s"Compaction for partition $topicPartition is resumed") + } } checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 3653e282383..2a4869098e7 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -91,11 +91,10 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { } /** - * log with retention in progress should not be picked up for compaction and vice versa when log cleanup policy - * is changed between "compact" and "delete" + * log under cleanup should be ineligible for compaction */ @Test - def testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): Unit = { + def testLogsUnderCleanupIneligibleForCompaction(): Unit = { val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) val cleanerManager: LogCleanerManager = createCleanerManager(log) @@ -105,7 +104,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { log.appendAsLeader(records, leaderEpoch = 0) log.onHighWatermarkIncremented(2L) - // simulate retention thread working on the log partition + // simulate cleanup thread working on the log partition val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions() assertEquals("should have 1 logs ready to be deleted", 1, deletableLog.size) @@ -118,11 +117,11 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val config = LogConfig(logProps) log.config = config - // log retention inprogress, the log is not available for compaction + // log cleanup inprogress, the log is not available for compaction val cleanable = cleanerManager.grabFilthiestCompactedLog(time) assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size) - // log retention finished, and log can be picked up for compaction + // log cleanup finished, and log can be picked up for compaction cleanerManager.resumeCleaning(deletableLog.map(_._1)) val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time) assertEquals("should have 1 logs ready to be compacted", 1, cleanable2.size) @@ -132,16 +131,55 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val config2 = LogConfig(logProps) log.config = config2 - // compaction in progress, should have 0 log eligible for log retention + // compaction in progress, should have 0 log eligible for log cleanup val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions() assertEquals("should have 0 logs ready to be deleted", 0, deletableLog2.size) - // compaction done, should have 1 log eligible for log retention + // compaction done, should have 1 log eligible for log cleanup cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition)) val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions() assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) } + /** + * log under cleanup should still be eligible for log truncation + */ + @Test + def testConcurrentLogCleanupAndLogTruncation(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // log cleanup starts + val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions() + // Log truncation happens due to unclean leader election + cleanerManager.abortAndPauseCleaning(log.topicPartition) + cleanerManager.resumeCleaning(Seq(log.topicPartition)) + // log cleanup finishes and pausedPartitions are resumed + cleanerManager.resumeCleaning(pausedPartitions.map(_._1)) + + assertEquals(None, cleanerManager.cleaningState(log.topicPartition)) + } + + /** + * log under cleanup should still be eligible for topic deletion + */ + @Test + def testConcurrentLogCleanupAndTopicDeletion(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // log cleanup starts + val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions() + // Broker processes StopReplicaRequest with delete=true + cleanerManager.abortCleaning(log.topicPartition) + // log cleanup finishes and pausedPartitions are resumed + cleanerManager.resumeCleaning(pausedPartitions.map(_._1)) + + assertEquals(None, cleanerManager.cleaningState(log.topicPartition)) + } + /** * Test computation of cleanable range with no minimum compaction lag settings active */ @@ -280,7 +318,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val tp = new TopicPartition("log", 0) intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) - cleanerManager.setCleaningState(tp, LogCleaningPaused) + cleanerManager.setCleaningState(tp, LogCleaningPaused(1)) intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) cleanerManager.setCleaningState(tp, LogCleaningInProgress) @@ -290,7 +328,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.doneCleaning(tp, log.dir, 1) - assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) + assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get) assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty) } @@ -304,7 +342,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) - cleanerManager.setCleaningState(tp, LogCleaningPaused) + cleanerManager.setCleaningState(tp, LogCleaningPaused(1)) intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) cleanerManager.setCleaningState(tp, LogCleaningInProgress) @@ -313,7 +351,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.doneDeleting(Seq(tp)) - assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) + assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get) }