Browse Source

KAFKA-8725; Improve LogCleanerManager#grabFilthiestLog error handling (#7475)

KAFKA-7215 improved the log cleaner error handling to mitigate thread death but missed one case. Exceptions in grabFilthiestCompactedLog still cause the thread to die.

This patch improves handling to ensure that errors in that function still mark a partition as uncleanable and do not crash the thread.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/7502/head
Stanislav Kozlovski 5 years ago committed by Jason Gustafson
parent
commit
f771005f40
  1. 76
      core/src/main/scala/kafka/log/LogCleaner.scala
  2. 21
      core/src/main/scala/kafka/log/LogCleanerManager.scala
  3. 54
      core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

76
core/src/main/scala/kafka/log/LogCleaner.scala

@ -310,53 +310,59 @@ class LogCleaner(initialConfig: CleanerConfig, @@ -310,53 +310,59 @@ class LogCleaner(initialConfig: CleanerConfig,
* Clean a log if there is a dirty log available, otherwise sleep for a bit
*/
override def doWork(): Unit = {
val cleaned = cleanFilthiestLog()
val cleaned = tryCleanFilthiestLog()
if (!cleaned)
pause(config.backOffMs, TimeUnit.MILLISECONDS)
}
/**
* Cleans a log if there is a dirty log available
* @return whether a log was cleaned
*/
private def cleanFilthiestLog(): Boolean = {
var currentLog: Option[Log] = None
* Cleans a log if there is a dirty log available
* @return whether a log was cleaned
*/
private def tryCleanFilthiestLog(): Boolean = {
try {
val preCleanStats = new PreCleanStats()
val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
case None =>
false
case Some(cleanable) =>
this.lastPreCleanStats = preCleanStats
// there's a log, clean it
currentLog = Some(cleanable.log)
cleanFilthiestLog()
} catch {
case e: LogCleaningException =>
warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)
false
}
}
@throws(classOf[LogCleaningException])
private def cleanFilthiestLog(): Boolean = {
val preCleanStats = new PreCleanStats()
val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
case None =>
false
case Some(cleanable) =>
// there's a log, clean it
this.lastPreCleanStats = preCleanStats
try {
cleanLog(cleanable)
true
}
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
try {
deletable.foreach { case (_, log) =>
currentLog = Some(log)
} catch {
case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
}
}
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
try {
deletable.foreach { case (_, log) =>
try {
log.deleteOldSegments()
} catch {
case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
case e: Exception => throw new LogCleaningException(log, e.getMessage, e)
}
} finally {
cleanerManager.doneDeleting(deletable.map(_._1))
}
cleaned
} catch {
case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
case e: Exception =>
if (currentLog.isEmpty) {
throw new IllegalStateException("currentLog cannot be empty on an unexpected exception", e)
}
val erroneousLog = currentLog.get
warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its partition (${erroneousLog.topicPartition}) as uncleanable", e)
cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition)
false
} finally {
cleanerManager.doneDeleting(deletable.map(_._1))
}
cleaned
}
private def cleanLog(cleanable: LogToClean): Unit = {

21
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.common.LogCleaningAbortedException
import kafka.common.{KafkaException, LogCleaningAbortedException}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.LogDirFailureChannel
import kafka.server.checkpoints.OffsetCheckpointFile
@ -39,6 +39,10 @@ private[log] case object LogCleaningInProgress extends LogCleaningState @@ -39,6 +39,10 @@ private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState
private[log] class LogCleaningException(val log: Log,
private val message: String,
private val cause: Throwable) extends KafkaException(message, cause)
/**
* This class manages the state of each partition being cleaned.
* LogCleaningState defines the cleaning states that a TopicPartition can be in.
@ -180,11 +184,16 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -180,11 +184,16 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now)
val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
try {
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now)
val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
} catch {
case e: Throwable => throw new LogCleaningException(log,
s"Failed to calculate log cleaning stats for partition $topicPartition", e)
}
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0

54
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

@ -77,6 +77,42 @@ class LogCleanerManagerTest extends Logging { @@ -77,6 +77,42 @@ class LogCleanerManagerTest extends Logging {
logs
}
@Test
def testGrabFilthiestCompactedLogThrowsException(): Unit = {
val tp = new TopicPartition("A", 1)
val logSegmentSize = TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10
val logSegmentsCount = 2
val tpDir = new File(logDir, "A-1")
// the exception should be catched and the partition that caused it marked as uncleanable
class LogMock(dir: File, config: LogConfig) extends Log(dir, config, 0L, 0L,
time.scheduler, new BrokerTopicStats, time, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs,
topicPartition, new ProducerStateManager(tp, tpDir, 60 * 60 * 1000), new LogDirFailureChannel(10)) {
// Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
override def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] =
throw new IllegalStateException("Error!")
}
val log: Log = new LogMock(tpDir, createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact))
writeRecords(log = log,
numBatches = logSegmentsCount * 2,
recordsPerBatch = 10,
batchesPerSegment = 2
)
val logsPool = new Pool[TopicPartition, Log]()
logsPool.put(tp, log)
val cleanerManager = createCleanerManagerMock(logsPool)
cleanerCheckpoints.put(tp, 1)
val thrownException = intercept[LogCleaningException] {
cleanerManager.grabFilthiestCompactedLog(time).get
}
assertEquals(log, thrownException.log)
assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException])
}
@Test
def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
val tp0 = new TopicPartition("wishing-well", 0)
@ -505,13 +541,7 @@ class LogCleanerManagerTest extends Logging { @@ -505,13 +541,7 @@ class LogCleanerManagerTest extends Logging {
private def createLog(segmentSize: Int,
cleanupPolicy: String,
topicPartition: TopicPartition = new TopicPartition("log", 0)): Log = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
logProps.put(LogConfig.RetentionMsProp, 1: Integer)
logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests
val config = LogConfig(logProps)
val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
val partitionDir = new File(logDir, Log.logDirName(topicPartition))
Log(partitionDir,
@ -526,6 +556,16 @@ class LogCleanerManagerTest extends Logging { @@ -526,6 +556,16 @@ class LogCleanerManagerTest extends Logging {
logDirFailureChannel = new LogDirFailureChannel(10))
}
private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
logProps.put(LogConfig.RetentionMsProp, 1: Integer)
logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests
LogConfig(logProps)
}
private def writeRecords(log: Log,
numBatches: Int,
recordsPerBatch: Int,

Loading…
Cancel
Save