From 41e4e93b5ae8a7d221fce1733e050cb98ac9713c Mon Sep 17 00:00:00 2001 From: huxi Date: Fri, 26 Jan 2018 01:49:50 +0800 Subject: [PATCH] =?UTF-8?q?KAFKA-6429;=20LogCleanerManager.cleanableOffset?= =?UTF-8?q?s=20should=20create=20objects=20=E2=80=A6=20(#4399)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …for dirty non-active segments only when `log.cleaner.min.compaction.lag.ms` is greater than 0 With `log.cleaner.min.compaction.lag.ms`'s default value of 0, there is no need to hold heap objects for those dirty non-active segments. This could reduce the heap size and also avoid the unnecessary monitor lock retrieval. --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index b48b757a335..c3d3892aef9 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -344,10 +344,7 @@ private[log] object LogCleanerManager extends Logging { offset } } - - // dirty log segments - val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset) - + val compactionLagMs = math.max(log.config.compactionLagMs, 0L) // find first segment that cannot be cleaned @@ -363,6 +360,8 @@ private[log] object LogCleanerManager extends Logging { // the first segment whose largest message timestamp is within a minimum time lag from now if (compactionLagMs > 0) { + // dirty log segments + val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - compactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")