diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index d7f31568836..de3d7a33d0c 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -57,11 +57,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) { cleaner.startup() val firstDirty = log.activeSegment.baseOffset - // wait until we clean up to base_offset of active segment - minDirtyMessages + // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than LogConfig.MinCleanableDirtyRatioProp cleaner.awaitCleaned("log", 0, firstDirty) - + val compactedSize = log.logSegments.map(_.size).sum val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get - assertTrue("log cleaner should have processed up to offset " + firstDirty, lastCleaned >= firstDirty); + assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned", lastCleaned >= firstDirty) + assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) val read = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap) @@ -73,7 +74,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { cleaner.awaitCleaned("log", 0, firstDirty2) val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get - assertTrue("log cleaner should have processed up to offset " + firstDirty2, lastCleaned2 >= firstDirty2); + assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2); val read2 = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) @@ -123,7 +124,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { /* create a cleaner instance and logs with the given parameters */ def makeCleaner(parts: Int, - minDirtyMessages: Int = 0, + minCleanableDirtyRatio: Float = 0.0F, numThreads: Int = 1, defaultPolicy: String = "compact", policyOverrides: Map[String, String] = Map()): LogCleaner = { @@ -138,6 +139,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) { logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer) logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer) logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float) + val log = new Log(dir = dir, LogConfig(logProps), recoveryPoint = 0L,