diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d3a3ba0df03..219f49716f0 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -310,53 +310,59 @@ class LogCleaner(initialConfig: CleanerConfig, * Clean a log if there is a dirty log available, otherwise sleep for a bit */ override def doWork(): Unit = { - val cleaned = cleanFilthiestLog() + val cleaned = tryCleanFilthiestLog() if (!cleaned) pause(config.backOffMs, TimeUnit.MILLISECONDS) } /** - * Cleans a log if there is a dirty log available - * @return whether a log was cleaned - */ - private def cleanFilthiestLog(): Boolean = { - var currentLog: Option[Log] = None - + * Cleans a log if there is a dirty log available + * @return whether a log was cleaned + */ + private def tryCleanFilthiestLog(): Boolean = { try { - val preCleanStats = new PreCleanStats() - val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { - case None => - false - case Some(cleanable) => - this.lastPreCleanStats = preCleanStats - // there's a log, clean it - currentLog = Some(cleanable.log) + cleanFilthiestLog() + } catch { + case e: LogCleaningException => + warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e) + cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition) + + false + } + } + + @throws(classOf[LogCleaningException]) + private def cleanFilthiestLog(): Boolean = { + val preCleanStats = new PreCleanStats() + val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { + case None => + false + case Some(cleanable) => + // there's a log, clean it + this.lastPreCleanStats = preCleanStats + try { cleanLog(cleanable) true - } - val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() - try { - deletable.foreach { case (_, log) => - currentLog = Some(log) + } catch { + case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e + case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e) + } + } + val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() + try { + deletable.foreach { case (_, log) => + try { log.deleteOldSegments() + } catch { + case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e + case e: Exception => throw new LogCleaningException(log, e.getMessage, e) } - } finally { - cleanerManager.doneDeleting(deletable.map(_._1)) } - - cleaned - } catch { - case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e - case e: Exception => - if (currentLog.isEmpty) { - throw new IllegalStateException("currentLog cannot be empty on an unexpected exception", e) - } - val erroneousLog = currentLog.get - warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its partition (${erroneousLog.topicPartition}) as uncleanable", e) - cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition) - - false + } finally { + cleanerManager.doneDeleting(deletable.map(_._1)) } + + cleaned } private def cleanLog(cleanable: LogToClean): Unit = { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bde30b01141..a5cfed5c094 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge -import kafka.common.LogCleaningAbortedException +import kafka.common.{KafkaException, LogCleaningAbortedException} import kafka.metrics.KafkaMetricsGroup import kafka.server.LogDirFailureChannel import kafka.server.checkpoints.OffsetCheckpointFile @@ -39,6 +39,10 @@ private[log] case object LogCleaningInProgress extends LogCleaningState private[log] case object LogCleaningAborted extends LogCleaningState private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState +private[log] class LogCleaningException(val log: Log, + private val message: String, + private val cause: Throwable) extends KafkaException(message, cause) + /** * This class manages the state of each partition being cleaned. * LogCleaningState defines the cleaning states that a TopicPartition can be in. @@ -180,11 +184,16 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each - val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now) - val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) - preCleanStats.updateMaxCompactionDelay(compactionDelayMs) - - LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0) + try { + val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now) + val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) + preCleanStats.updateMaxCompactionDelay(compactionDelayMs) + + LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0) + } catch { + case e: Throwable => throw new LogCleaningException(log, + s"Failed to calculate log cleaning stats for partition $topicPartition", e) + } }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 6a74874be8d..db86c75c6ad 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -77,6 +77,42 @@ class LogCleanerManagerTest extends Logging { logs } + @Test + def testGrabFilthiestCompactedLogThrowsException(): Unit = { + val tp = new TopicPartition("A", 1) + val logSegmentSize = TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10 + val logSegmentsCount = 2 + val tpDir = new File(logDir, "A-1") + + // the exception should be catched and the partition that caused it marked as uncleanable + class LogMock(dir: File, config: LogConfig) extends Log(dir, config, 0L, 0L, + time.scheduler, new BrokerTopicStats, time, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition, new ProducerStateManager(tp, tpDir, 60 * 60 * 1000), new LogDirFailureChannel(10)) { + + // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog() + override def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = + throw new IllegalStateException("Error!") + } + + val log: Log = new LogMock(tpDir, createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)) + writeRecords(log = log, + numBatches = logSegmentsCount * 2, + recordsPerBatch = 10, + batchesPerSegment = 2 + ) + + val logsPool = new Pool[TopicPartition, Log]() + logsPool.put(tp, log) + val cleanerManager = createCleanerManagerMock(logsPool) + cleanerCheckpoints.put(tp, 1) + + val thrownException = intercept[LogCleaningException] { + cleanerManager.grabFilthiestCompactedLog(time).get + } + assertEquals(log, thrownException.log) + assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException]) + } + @Test def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = { val tp0 = new TopicPartition("wishing-well", 0) @@ -505,13 +541,7 @@ class LogCleanerManagerTest extends Logging { private def createLog(segmentSize: Int, cleanupPolicy: String, topicPartition: TopicPartition = new TopicPartition("log", 0)): Log = { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer) - logProps.put(LogConfig.RetentionMsProp, 1: Integer) - logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) - logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests - - val config = LogConfig(logProps) + val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy) val partitionDir = new File(logDir, Log.logDirName(topicPartition)) Log(partitionDir, @@ -526,6 +556,16 @@ class LogCleanerManagerTest extends Logging { logDirFailureChannel = new LogDirFailureChannel(10)) } + private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer) + logProps.put(LogConfig.RetentionMsProp, 1: Integer) + logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests + + LogConfig(logProps) + } + private def writeRecords(log: Log, numBatches: Int, recordsPerBatch: Int,