diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index b48b757a335..c3d3892aef9 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -344,10 +344,7 @@ private[log] object LogCleanerManager extends Logging { offset } } - - // dirty log segments - val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset) - + val compactionLagMs = math.max(log.config.compactionLagMs, 0L) // find first segment that cannot be cleaned @@ -363,6 +360,8 @@ private[log] object LogCleanerManager extends Logging { // the first segment whose largest message timestamp is within a minimum time lag from now if (compactionLagMs > 0) { + // dirty log segments + val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - compactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")