Browse Source

kafka-2235; LogCleaner offset map overflow; patched by Ivan Simoneko; reviewed by Jun Rao

pull/72/head
Ivan Simoneko 10 years ago committed by Jun Rao
parent
commit
dc54055d05
  1. 12
      core/src/main/scala/kafka/log/LogCleaner.scala

12
core/src/main/scala/kafka/log/LogCleaner.scala

@ -559,11 +559,17 @@ private[log] class Cleaner(val id: Int, @@ -559,11 +559,17 @@ private[log] class Cleaner(val id: Int,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var offset = dirty.head.baseOffset
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
for (segment <- dirty) {
val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
var full = false
for (segment <- dirty if !full) {
checkDone(log.topicAndPartition)
if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
val segmentSize = segment.nextOffset() - segment.baseOffset
require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize))
if (map.size + segmentSize <= maxDesiredMapSize)
offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
else
full = true
}
info("Offset map for log %s complete.".format(log.name))
offset

Loading…
Cancel
Save