Browse Source

KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently

Author: Xiongqi Wesley Wu <xiongqi.wu@gmail.com>

Reviewers: Dong Lin <lindong28@gmail.com>

Closes #5694 from xiowu0/fixrace2
pull/5744/head
Xiongqi Wesley Wu 6 years ago committed by Dong Lin
parent
commit
7ea0655711
  1. 72
      core/src/main/scala/kafka/log/LogCleanerManager.scala
  2. 15
      core/src/main/scala/kafka/log/LogManager.scala
  3. 62
      core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

72
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -37,16 +37,24 @@ import scala.collection.{Iterable, immutable, mutable}
private[log] sealed trait LogCleaningState private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case object LogCleaningPaused extends LogCleaningState private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState
/** /**
* Manage the state of each partition being cleaned. * This class manages the state of each partition being cleaned.
* If a partition is to be cleaned, it enters the LogCleaningInProgress state. * LogCleaningState defines the cleaning states that a TopicPartition can be in.
* While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters * 1. None : No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress
* the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state. * or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1)
* While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is * 2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished
* requested to be resumed. * or become LogCleaningAborted. Valid previous state is None.
*/ * 3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1).
* Valid previous state is LogCleaningInProgress.
* 4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state.
* In this state, it can become None or LogCleaningPaused(2).
* Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2).
* 4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state.
* In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
* Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
*/
private[log] class LogCleanerManager(val logDirs: Seq[File], private[log] class LogCleanerManager(val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log], val logs: Pool[TopicPartition, Log],
val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
@ -164,7 +172,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
} }
deletableLogs.foreach { deletableLogs.foreach {
case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused) case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused(1))
} }
deletableLogs deletableLogs
} }
@ -207,22 +215,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* throws a LogCleaningAbortedException to stop the cleaning task. * throws a LogCleaningAbortedException to stop the cleaning task.
* 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused. * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
* 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused. * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
* 6. If the partition is already paused, a new call to this function
* will increase the paused count by one.
*/ */
def abortAndPauseCleaning(topicPartition: TopicPartition) { def abortAndPauseCleaning(topicPartition: TopicPartition) {
inLock(lock) { inLock(lock) {
inProgress.get(topicPartition) match { inProgress.get(topicPartition) match {
case None => case None =>
inProgress.put(topicPartition, LogCleaningPaused) inProgress.put(topicPartition, LogCleaningPaused(1))
case Some(state) => case Some(LogCleaningInProgress) =>
state match { inProgress.put(topicPartition, LogCleaningAborted)
case LogCleaningInProgress => case Some(LogCleaningPaused(count)) =>
inProgress.put(topicPartition, LogCleaningAborted) inProgress.put(topicPartition, LogCleaningPaused(count + 1))
case LogCleaningPaused => case Some(s) =>
case s => throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.")
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.")
}
} }
while (!isCleaningInState(topicPartition, LogCleaningPaused))
while(!isCleaningInStatePaused(topicPartition))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
} }
info(s"The cleaning for partition $topicPartition is aborted and paused") info(s"The cleaning for partition $topicPartition is aborted and paused")
@ -230,6 +239,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
/** /**
* Resume the cleaning of paused partitions. * Resume the cleaning of paused partitions.
* Each call of this function will undo one pause.
*/ */
def resumeCleaning(topicPartitions: Iterable[TopicPartition]){ def resumeCleaning(topicPartitions: Iterable[TopicPartition]){
inLock(lock) { inLock(lock) {
@ -240,8 +250,10 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.") throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.")
case Some(state) => case Some(state) =>
state match { state match {
case LogCleaningPaused => case LogCleaningPaused(count) if count == 1 =>
inProgress.remove(topicPartition) inProgress.remove(topicPartition)
case LogCleaningPaused(count) if count > 1 =>
inProgress.put(topicPartition, LogCleaningPaused(count - 1))
case s => case s =>
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.") throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.")
} }
@ -264,6 +276,22 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
} }
} }
/**
* Check if the cleaning for a partition is paused. The caller is expected to hold lock while making the call.
*/
private def isCleaningInStatePaused(topicPartition: TopicPartition): Boolean = {
inProgress.get(topicPartition) match {
case None => false
case Some(state) =>
state match {
case LogCleaningPaused(s) =>
true
case _ =>
false
}
}
}
/** /**
* Check if the cleaning for a partition is aborted. If so, throw an exception. * Check if the cleaning for a partition is aborted. If so, throw an exception.
*/ */
@ -337,7 +365,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
updateCheckpoints(dataDir, Option(topicPartition, endOffset)) updateCheckpoints(dataDir, Option(topicPartition, endOffset))
inProgress.remove(topicPartition) inProgress.remove(topicPartition)
case Some(LogCleaningAborted) => case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused) inProgress.put(topicPartition, LogCleaningPaused(1))
pausedCleaningCond.signalAll() pausedCleaningCond.signalAll()
case None => case None =>
throw new IllegalStateException(s"State for partition $topicPartition should exist.") throw new IllegalStateException(s"State for partition $topicPartition should exist.")
@ -355,7 +383,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case Some(LogCleaningInProgress) => case Some(LogCleaningInProgress) =>
inProgress.remove(topicPartition) inProgress.remove(topicPartition)
case Some(LogCleaningAborted) => case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused) inProgress.put(topicPartition, LogCleaningPaused(1))
pausedCleaningCond.signalAll() pausedCleaningCond.signalAll()
case None => case None =>
throw new IllegalStateException(s"State for partition $topicPartition should exist.") throw new IllegalStateException(s"State for partition $topicPartition should exist.")

15
core/src/main/scala/kafka/log/LogManager.scala

@ -546,11 +546,16 @@ class LogManager(logDirs: Seq[File],
//Abort and pause the cleaning of the log, and resume after truncation is done. //Abort and pause the cleaning of the log, and resume after truncation is done.
if (cleaner != null && !isFuture) if (cleaner != null && !isFuture)
cleaner.abortAndPauseCleaning(topicPartition) cleaner.abortAndPauseCleaning(topicPartition)
log.truncateFullyAndStartAt(newOffset) try {
if (cleaner != null && !isFuture) { log.truncateFullyAndStartAt(newOffset)
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) if (cleaner != null && !isFuture) {
cleaner.resumeCleaning(Seq(topicPartition)) cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
info(s"Compaction for partition $topicPartition is resumed") }
} finally {
if (cleaner != null && !isFuture) {
cleaner.resumeCleaning(Seq(topicPartition))
info(s"Compaction for partition $topicPartition is resumed")
}
} }
checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile) checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
} }

62
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

@ -91,11 +91,10 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
} }
/** /**
* log with retention in progress should not be picked up for compaction and vice versa when log cleanup policy * log under cleanup should be ineligible for compaction
* is changed between "compact" and "delete"
*/ */
@Test @Test
def testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): Unit = { def testLogsUnderCleanupIneligibleForCompaction(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log) val cleanerManager: LogCleanerManager = createCleanerManager(log)
@ -105,7 +104,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
log.appendAsLeader(records, leaderEpoch = 0) log.appendAsLeader(records, leaderEpoch = 0)
log.onHighWatermarkIncremented(2L) log.onHighWatermarkIncremented(2L)
// simulate retention thread working on the log partition // simulate cleanup thread working on the log partition
val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions() val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
assertEquals("should have 1 logs ready to be deleted", 1, deletableLog.size) assertEquals("should have 1 logs ready to be deleted", 1, deletableLog.size)
@ -118,11 +117,11 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val config = LogConfig(logProps) val config = LogConfig(logProps)
log.config = config log.config = config
// log retention inprogress, the log is not available for compaction // log cleanup inprogress, the log is not available for compaction
val cleanable = cleanerManager.grabFilthiestCompactedLog(time) val cleanable = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size) assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size)
// log retention finished, and log can be picked up for compaction // log cleanup finished, and log can be picked up for compaction
cleanerManager.resumeCleaning(deletableLog.map(_._1)) cleanerManager.resumeCleaning(deletableLog.map(_._1))
val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time) val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals("should have 1 logs ready to be compacted", 1, cleanable2.size) assertEquals("should have 1 logs ready to be compacted", 1, cleanable2.size)
@ -132,16 +131,55 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val config2 = LogConfig(logProps) val config2 = LogConfig(logProps)
log.config = config2 log.config = config2
// compaction in progress, should have 0 log eligible for log retention // compaction in progress, should have 0 log eligible for log cleanup
val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions() val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions()
assertEquals("should have 0 logs ready to be deleted", 0, deletableLog2.size) assertEquals("should have 0 logs ready to be deleted", 0, deletableLog2.size)
// compaction done, should have 1 log eligible for log retention // compaction done, should have 1 log eligible for log cleanup
cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition)) cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition))
val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions() val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
} }
/**
* log under cleanup should still be eligible for log truncation
*/
@Test
def testConcurrentLogCleanupAndLogTruncation(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// log cleanup starts
val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
// Log truncation happens due to unclean leader election
cleanerManager.abortAndPauseCleaning(log.topicPartition)
cleanerManager.resumeCleaning(Seq(log.topicPartition))
// log cleanup finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
}
/**
* log under cleanup should still be eligible for topic deletion
*/
@Test
def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// log cleanup starts
val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
// Broker processes StopReplicaRequest with delete=true
cleanerManager.abortCleaning(log.topicPartition)
// log cleanup finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
}
/** /**
* Test computation of cleanable range with no minimum compaction lag settings active * Test computation of cleanable range with no minimum compaction lag settings active
*/ */
@ -280,7 +318,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val tp = new TopicPartition("log", 0) val tp = new TopicPartition("log", 0)
intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1))
cleanerManager.setCleaningState(tp, LogCleaningPaused) cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1))
cleanerManager.setCleaningState(tp, LogCleaningInProgress) cleanerManager.setCleaningState(tp, LogCleaningInProgress)
@ -290,7 +328,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.setCleaningState(tp, LogCleaningAborted)
cleanerManager.doneCleaning(tp, log.dir, 1) cleanerManager.doneCleaning(tp, log.dir, 1)
assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty) assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
} }
@ -304,7 +342,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
cleanerManager.setCleaningState(tp, LogCleaningPaused) cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
cleanerManager.setCleaningState(tp, LogCleaningInProgress) cleanerManager.setCleaningState(tp, LogCleaningInProgress)
@ -313,7 +351,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.setCleaningState(tp, LogCleaningAborted)
cleanerManager.doneDeleting(Seq(tp)) cleanerManager.doneDeleting(Seq(tp))
assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
} }

Loading…
Cancel
Save