diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 7fc70dabfd3..d172920336d 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -731,13 +731,14 @@ private[log] class Cleaner(val id: Int, if (record.hasKey) { val key = record.key val foundOffset = map.get(key) - /* two cases in which we can get rid of a message: - * 1) if there exists a message with the same key but higher offset - * 2) if the message is a delete "tombstone" marker and enough time has passed + /* First,the message must have the latest offset for the key + * then there are two cases in which we can retain a message: + * 1) The message has value + * 2) The message doesn't has value but it can't be deleted now. */ - val redundant = foundOffset >= 0 && record.offset < foundOffset - val obsoleteDelete = !retainDeletes && !record.hasValue - !redundant && !obsoleteDelete + val latestOffsetForKey = record.offset() >= foundOffset + val isRetainedValue = record.hasValue || retainDeletes + latestOffsetForKey && isRetainedValue } else { stats.invalidMessage() false