|
|
|
@ -57,11 +57,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@@ -57,11 +57,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
|
|
|
|
|
cleaner.startup() |
|
|
|
|
|
|
|
|
|
val firstDirty = log.activeSegment.baseOffset |
|
|
|
|
// wait until we clean up to base_offset of active segment - minDirtyMessages |
|
|
|
|
// wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than LogConfig.MinCleanableDirtyRatioProp |
|
|
|
|
cleaner.awaitCleaned("log", 0, firstDirty) |
|
|
|
|
|
|
|
|
|
val compactedSize = log.logSegments.map(_.size).sum |
|
|
|
|
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get |
|
|
|
|
assertTrue("log cleaner should have processed up to offset " + firstDirty, lastCleaned >= firstDirty); |
|
|
|
|
assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned", lastCleaned >= firstDirty) |
|
|
|
|
assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) |
|
|
|
|
|
|
|
|
|
val read = readFromLog(log) |
|
|
|
|
assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap) |
|
|
|
@ -73,7 +74,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@@ -73,7 +74,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
|
|
|
|
|
cleaner.awaitCleaned("log", 0, firstDirty2) |
|
|
|
|
|
|
|
|
|
val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get |
|
|
|
|
assertTrue("log cleaner should have processed up to offset " + firstDirty2, lastCleaned2 >= firstDirty2); |
|
|
|
|
assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2); |
|
|
|
|
|
|
|
|
|
val read2 = readFromLog(log) |
|
|
|
|
assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) |
|
|
|
@ -123,7 +124,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@@ -123,7 +124,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
|
|
|
|
|
|
|
|
|
|
/* create a cleaner instance and logs with the given parameters */ |
|
|
|
|
def makeCleaner(parts: Int, |
|
|
|
|
minDirtyMessages: Int = 0, |
|
|
|
|
minCleanableDirtyRatio: Float = 0.0F, |
|
|
|
|
numThreads: Int = 1, |
|
|
|
|
defaultPolicy: String = "compact", |
|
|
|
|
policyOverrides: Map[String, String] = Map()): LogCleaner = { |
|
|
|
@ -138,6 +139,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@@ -138,6 +139,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
|
|
|
|
|
logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer) |
|
|
|
|
logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer) |
|
|
|
|
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) |
|
|
|
|
logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float) |
|
|
|
|
|
|
|
|
|
val log = new Log(dir = dir, |
|
|
|
|
LogConfig(logProps), |
|
|
|
|
recoveryPoint = 0L, |
|
|
|
|