From ad4a7c343620bdeb4395ff80b0fae7fbb585df7c Mon Sep 17 00:00:00 2001 From: Lysss Date: Thu, 25 Apr 2019 22:53:47 +0800 Subject: [PATCH] MINOR: Make LogCleaner.shouldRetainRecord more readable (#6590) Reviewers: Bob Barrett , Jason Gustafson --- core/src/main/scala/kafka/log/LogCleaner.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 7fc70dabfd3..d172920336d 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -731,13 +731,14 @@ private[log] class Cleaner(val id: Int, if (record.hasKey) { val key = record.key val foundOffset = map.get(key) - /* two cases in which we can get rid of a message: - * 1) if there exists a message with the same key but higher offset - * 2) if the message is a delete "tombstone" marker and enough time has passed + /* First,the message must have the latest offset for the key + * then there are two cases in which we can retain a message: + * 1) The message has value + * 2) The message doesn't has value but it can't be deleted now. */ - val redundant = foundOffset >= 0 && record.offset < foundOffset - val obsoleteDelete = !retainDeletes && !record.hasValue - !redundant && !obsoleteDelete + val latestOffsetForKey = record.offset() >= foundOffset + val isRetainedValue = record.hasValue || retainDeletes + latestOffsetForKey && isRetainedValue } else { stats.invalidMessage() false