diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 558c703f26d..da55a348f37 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,8 +21,6 @@ import java.util.Properties import org.apache.kafka.common.utils.Utils import scala.collection._ import org.apache.kafka.common.config.ConfigDef -import kafka.common._ -import scala.collection.JavaConversions._ import kafka.message.BrokerCompressionCodec object Defaults { @@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(FlushMessagesProp, flushInterval.toString) props.put(FlushMsProp, flushMs.toString) props.put(RetentionBytesProp, retentionSize.toString) - props.put(RententionMsProp, retentionMs.toString) + props.put(RetentionMsProp, retentionMs.toString) props.put(MaxMessageBytesProp, maxMessageSize.toString) props.put(IndexIntervalBytesProp, indexInterval.toString) props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) @@ -122,7 +120,7 @@ object LogConfig { val FlushMessagesProp = "flush.messages" val FlushMsProp = "flush.ms" val RetentionBytesProp = "retention.bytes" - val RententionMsProp = "retention.ms" + val RetentionMsProp = "retention.ms" val MaxMessageBytesProp = "max.message.bytes" val IndexIntervalBytesProp = "index.interval.bytes" val DeleteRetentionMsProp = "delete.retention.ms" @@ -172,7 +170,8 @@ object LogConfig { .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc) // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc) - .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc) + // can be negative. See kafka.log.LogManager.cleanupExpiredSegments + .define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc) .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc) .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc) .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM, DeleteRetentionMsDoc) @@ -206,7 +205,7 @@ object LogConfig { flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long], flushMs = parsed.get(FlushMsProp).asInstanceOf[Long], retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long], - retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long], + retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long], maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int], indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int], fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long], diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a7a9b85ba1b..e781ebac267 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -407,6 +407,8 @@ class LogManager(val logDirs: Array[File], * Runs through the log removing segments older than a certain age */ private def cleanupExpiredSegments(log: Log): Int = { + if (log.config.retentionMs < 0) + return 0 val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 69b772c1941..cfbbd2be550 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -417,7 +417,7 @@ object KafkaConfig { .define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false) .define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false) - .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, atLeast(1), HIGH, LogRetentionTimeHoursDoc) + .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc) .define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc) .define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc) @@ -770,12 +770,16 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - _logRetentionTimeMillis.getOrElse( - _logRetentionTimeMins match { - case Some(mins) => millisInMinute * mins - case None => millisInHour * logRetentionTimeHours - } - ) + val millis = { + _logRetentionTimeMillis.getOrElse( + _logRetentionTimeMins match { + case Some(mins) => millisInMinute * mins + case None => millisInHour * logRetentionTimeHours + } + ) + } + if (millis < 0) return -1 + millis } private def getMap(propName: String, propValue: String): Map[String, String] = { @@ -834,8 +838,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") - require(logRetentionTimeMillis >= 1, "log.retention.ms must be equal or greater than 1") - require(_logRetentionTimeMins.forall(_ >= 1), "log.retention.minutes must be equal or greater than 1") + + require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1") + require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1") + require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") require(logDirs.size > 0) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index cfe38df577e..4b728a18a43 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) - props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) + props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString) props } diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala similarity index 93% rename from core/src/test/scala/kafka/log/LogConfigTest.scala rename to core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 9690f141be7..f3546adee49 100644 --- a/core/src/test/scala/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -15,12 +15,14 @@ * limitations under the License. */ -package kafka.log +package unit.kafka.log +import java.util.Properties + +import kafka.log.{Defaults, LogConfig} import org.apache.kafka.common.config.ConfigException -import org.scalatest.junit.JUnit3Suite import org.junit.{Assert, Test} -import java.util.Properties +import org.scalatest.junit.JUnit3Suite class LogConfigTest extends JUnit3Suite { @@ -56,6 +58,7 @@ class LogConfigTest extends JUnit3Suite { case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString) + case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString) case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) } }) @@ -70,6 +73,7 @@ class LogConfigTest extends JUnit3Suite { name match { case LogConfig.UncleanLeaderElectionEnableProp => return case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") + case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 40c265aabae..2428dbd7197 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -83,6 +83,59 @@ class KafkaConfigTest extends JUnit3Suite { val cfg = KafkaConfig.fromProps(props) assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) } + @Test + def testLogRetentionUnlimited() { + val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props2 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props3 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + + props1.put("log.retention.ms", "-1") + props2.put("log.retention.minutes", "-1") + props3.put("log.retention.hours", "-1") + + val cfg1 = KafkaConfig.fromProps(props1) + val cfg2 = KafkaConfig.fromProps(props2) + val cfg3 = KafkaConfig.fromProps(props3) + assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis) + assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis) + assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis) + + props4.put("log.retention.ms", "-1") + props4.put("log.retention.minutes", "30") + + val cfg4 = KafkaConfig.fromProps(props4) + assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis) + + props5.put("log.retention.ms", "0") + + intercept[IllegalArgumentException] { + val cfg5 = KafkaConfig.fromProps(props5) + } + } + + @Test + def testLogRetentionValid { + val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + + props1.put("log.retention.ms", "0") + props2.put("log.retention.minutes", "0") + props3.put("log.retention.hours", "0") + + intercept[IllegalArgumentException] { + val cfg1 = KafkaConfig.fromProps(props1) + } + intercept[IllegalArgumentException] { + val cfg2 = KafkaConfig.fromProps(props2) + } + intercept[IllegalArgumentException] { + val cfg3 = KafkaConfig.fromProps(props3) + } + + } @Test def testAdvertiseDefaults() {