Browse Source

KAFKA-6429; LogCleanerManager.cleanableOffsets should create objects … (#4399)

…for dirty non-active segments only when `log.cleaner.min.compaction.lag.ms` is greater than 0

With `log.cleaner.min.compaction.lag.ms`'s default value of 0, there is no need to hold heap objects for those dirty non-active segments. This could reduce the heap size and also avoid the unnecessary monitor lock retrieval.
pull/4476/head
huxi 7 years ago committed by Jason Gustafson
parent
commit
41e4e93b5a
  1. 7
      core/src/main/scala/kafka/log/LogCleanerManager.scala

7
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -344,10 +344,7 @@ private[log] object LogCleanerManager extends Logging { @@ -344,10 +344,7 @@ private[log] object LogCleanerManager extends Logging {
offset
}
}
// dirty log segments
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
// find first segment that cannot be cleaned
@ -363,6 +360,8 @@ private[log] object LogCleanerManager extends Logging { @@ -363,6 +360,8 @@ private[log] object LogCleanerManager extends Logging {
// the first segment whose largest message timestamp is within a minimum time lag from now
if (compactionLagMs > 0) {
// dirty log segments
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
dirtyNonActiveSegments.find { s =>
val isUncleanable = s.largestTimestamp > now - compactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")

Loading…
Cancel
Save