From f771005f40af160e604b65b1f376c3ef2aff5cd1 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Fri, 11 Oct 2019 23:10:29 +0100 Subject: [PATCH] KAFKA-8725; Improve LogCleanerManager#grabFilthiestLog error handling (#7475) KAFKA-7215 improved the log cleaner error handling to mitigate thread death but missed one case. Exceptions in grabFilthiestCompactedLog still cause the thread to die. This patch improves handling to ensure that errors in that function still mark a partition as uncleanable and do not crash the thread. Reviewers: Jason Gustafson --- .../src/main/scala/kafka/log/LogCleaner.scala | 76 ++++++++++--------- .../scala/kafka/log/LogCleanerManager.scala | 21 +++-- .../kafka/log/LogCleanerManagerTest.scala | 54 +++++++++++-- 3 files changed, 103 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d3a3ba0df03..219f49716f0 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -310,53 +310,59 @@ class LogCleaner(initialConfig: CleanerConfig, * Clean a log if there is a dirty log available, otherwise sleep for a bit */ override def doWork(): Unit = { - val cleaned = cleanFilthiestLog() + val cleaned = tryCleanFilthiestLog() if (!cleaned) pause(config.backOffMs, TimeUnit.MILLISECONDS) } /** - * Cleans a log if there is a dirty log available - * @return whether a log was cleaned - */ - private def cleanFilthiestLog(): Boolean = { - var currentLog: Option[Log] = None - + * Cleans a log if there is a dirty log available + * @return whether a log was cleaned + */ + private def tryCleanFilthiestLog(): Boolean = { try { - val preCleanStats = new PreCleanStats() - val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { - case None => - false - case Some(cleanable) => - this.lastPreCleanStats = preCleanStats - // there's a log, clean it - currentLog = Some(cleanable.log) + cleanFilthiestLog() + } catch { + case e: LogCleaningException => + warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e) + cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition) + + false + } + } + + @throws(classOf[LogCleaningException]) + private def cleanFilthiestLog(): Boolean = { + val preCleanStats = new PreCleanStats() + val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { + case None => + false + case Some(cleanable) => + // there's a log, clean it + this.lastPreCleanStats = preCleanStats + try { cleanLog(cleanable) true - } - val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() - try { - deletable.foreach { case (_, log) => - currentLog = Some(log) + } catch { + case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e + case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e) + } + } + val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() + try { + deletable.foreach { case (_, log) => + try { log.deleteOldSegments() + } catch { + case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e + case e: Exception => throw new LogCleaningException(log, e.getMessage, e) } - } finally { - cleanerManager.doneDeleting(deletable.map(_._1)) } - - cleaned - } catch { - case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e - case e: Exception => - if (currentLog.isEmpty) { - throw new IllegalStateException("currentLog cannot be empty on an unexpected exception", e) - } - val erroneousLog = currentLog.get - warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its partition (${erroneousLog.topicPartition}) as uncleanable", e) - cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition) - - false + } finally { + cleanerManager.doneDeleting(deletable.map(_._1)) } + + cleaned } private def cleanLog(cleanable: LogToClean): Unit = { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bde30b01141..a5cfed5c094 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge -import kafka.common.LogCleaningAbortedException +import kafka.common.{KafkaException, LogCleaningAbortedException} import kafka.metrics.KafkaMetricsGroup import kafka.server.LogDirFailureChannel import kafka.server.checkpoints.OffsetCheckpointFile @@ -39,6 +39,10 @@ private[log] case object LogCleaningInProgress extends LogCleaningState private[log] case object LogCleaningAborted extends LogCleaningState private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState +private[log] class LogCleaningException(val log: Log, + private val message: String, + private val cause: Throwable) extends KafkaException(message, cause) + /** * This class manages the state of each partition being cleaned. * LogCleaningState defines the cleaning states that a TopicPartition can be in. @@ -180,11 +184,16 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each - val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now) - val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) - preCleanStats.updateMaxCompactionDelay(compactionDelayMs) - - LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0) + try { + val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now) + val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) + preCleanStats.updateMaxCompactionDelay(compactionDelayMs) + + LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0) + } catch { + case e: Throwable => throw new LogCleaningException(log, + s"Failed to calculate log cleaning stats for partition $topicPartition", e) + } }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 6a74874be8d..db86c75c6ad 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -77,6 +77,42 @@ class LogCleanerManagerTest extends Logging { logs } + @Test + def testGrabFilthiestCompactedLogThrowsException(): Unit = { + val tp = new TopicPartition("A", 1) + val logSegmentSize = TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10 + val logSegmentsCount = 2 + val tpDir = new File(logDir, "A-1") + + // the exception should be catched and the partition that caused it marked as uncleanable + class LogMock(dir: File, config: LogConfig) extends Log(dir, config, 0L, 0L, + time.scheduler, new BrokerTopicStats, time, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition, new ProducerStateManager(tp, tpDir, 60 * 60 * 1000), new LogDirFailureChannel(10)) { + + // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog() + override def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = + throw new IllegalStateException("Error!") + } + + val log: Log = new LogMock(tpDir, createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)) + writeRecords(log = log, + numBatches = logSegmentsCount * 2, + recordsPerBatch = 10, + batchesPerSegment = 2 + ) + + val logsPool = new Pool[TopicPartition, Log]() + logsPool.put(tp, log) + val cleanerManager = createCleanerManagerMock(logsPool) + cleanerCheckpoints.put(tp, 1) + + val thrownException = intercept[LogCleaningException] { + cleanerManager.grabFilthiestCompactedLog(time).get + } + assertEquals(log, thrownException.log) + assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException]) + } + @Test def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = { val tp0 = new TopicPartition("wishing-well", 0) @@ -505,13 +541,7 @@ class LogCleanerManagerTest extends Logging { private def createLog(segmentSize: Int, cleanupPolicy: String, topicPartition: TopicPartition = new TopicPartition("log", 0)): Log = { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer) - logProps.put(LogConfig.RetentionMsProp, 1: Integer) - logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) - logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests - - val config = LogConfig(logProps) + val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy) val partitionDir = new File(logDir, Log.logDirName(topicPartition)) Log(partitionDir, @@ -526,6 +556,16 @@ class LogCleanerManagerTest extends Logging { logDirFailureChannel = new LogDirFailureChannel(10)) } + private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer) + logProps.put(LogConfig.RetentionMsProp, 1: Integer) + logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests + + LogConfig(logProps) + } + private def writeRecords(log: Log, numBatches: Int, recordsPerBatch: Int,