Browse Source

kafka-1990; Add unlimited time-based log retention; patched by Jeff Holoman; reviewed by Jun Rao

pull/55/merge
Jeff Holoman 10 years ago committed by Jun Rao
parent
commit
3139cc31f6
  1. 11
      core/src/main/scala/kafka/log/LogConfig.scala
  2. 2
      core/src/main/scala/kafka/log/LogManager.scala
  3. 12
      core/src/main/scala/kafka/server/KafkaConfig.scala
  4. 2
      core/src/test/scala/unit/kafka/admin/AdminTest.scala
  5. 10
      core/src/test/scala/unit/kafka/log/LogConfigTest.scala
  6. 53
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

11
core/src/main/scala/kafka/log/LogConfig.scala

@ -21,8 +21,6 @@ import java.util.Properties @@ -21,8 +21,6 @@ import java.util.Properties
import org.apache.kafka.common.utils.Utils
import scala.collection._
import org.apache.kafka.common.config.ConfigDef
import kafka.common._
import scala.collection.JavaConversions._
import kafka.message.BrokerCompressionCodec
object Defaults {
@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, @@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
props.put(FlushMessagesProp, flushInterval.toString)
props.put(FlushMsProp, flushMs.toString)
props.put(RetentionBytesProp, retentionSize.toString)
props.put(RententionMsProp, retentionMs.toString)
props.put(RetentionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
@ -122,7 +120,7 @@ object LogConfig { @@ -122,7 +120,7 @@ object LogConfig {
val FlushMessagesProp = "flush.messages"
val FlushMsProp = "flush.ms"
val RetentionBytesProp = "retention.bytes"
val RententionMsProp = "retention.ms"
val RetentionMsProp = "retention.ms"
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val DeleteRetentionMsProp = "delete.retention.ms"
@ -172,7 +170,8 @@ object LogConfig { @@ -172,7 +170,8 @@ object LogConfig {
.define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc)
// can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
.define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc)
.define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc)
// can be negative. See kafka.log.LogManager.cleanupExpiredSegments
.define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc)
.define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc)
.define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc)
.define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM, DeleteRetentionMsDoc)
@ -206,7 +205,7 @@ object LogConfig { @@ -206,7 +205,7 @@ object LogConfig {
flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long],
retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long],
maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],

2
core/src/main/scala/kafka/log/LogManager.scala

@ -407,6 +407,8 @@ class LogManager(val logDirs: Array[File], @@ -407,6 +407,8 @@ class LogManager(val logDirs: Array[File],
* Runs through the log removing segments older than a certain age
*/
private def cleanupExpiredSegments(log: Log): Int = {
if (log.config.retentionMs < 0)
return 0
val startMs = time.milliseconds
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}

12
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -417,7 +417,7 @@ object KafkaConfig { @@ -417,7 +417,7 @@ object KafkaConfig {
.define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false)
.define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false)
.define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, atLeast(1), HIGH, LogRetentionTimeHoursDoc)
.define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc)
.define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc)
.define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc)
@ -770,6 +770,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ @@ -770,6 +770,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
val millis = {
_logRetentionTimeMillis.getOrElse(
_logRetentionTimeMins match {
case Some(mins) => millisInMinute * mins
@ -777,6 +778,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ @@ -777,6 +778,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
}
)
}
if (millis < 0) return -1
millis
}
private def getMap(propName: String, propValue: String): Map[String, String] = {
try {
@ -834,8 +838,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ @@ -834,8 +838,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
require(logRetentionTimeMillis >= 1, "log.retention.ms must be equal or greater than 1")
require(_logRetentionTimeMins.forall(_ >= 1), "log.retention.minutes must be equal or greater than 1")
require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1")
require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
require(logDirs.size > 0)
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")

2
core/src/test/scala/unit/kafka/admin/AdminTest.scala

@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def makeConfig(messageSize: Int, retentionMs: Long) = {
var props = new Properties()
props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
props
}

10
core/src/test/scala/kafka/log/LogConfigTest.scala → core/src/test/scala/unit/kafka/log/LogConfigTest.scala

@ -15,12 +15,14 @@ @@ -15,12 +15,14 @@
* limitations under the License.
*/
package kafka.log
package unit.kafka.log
import java.util.Properties
import kafka.log.{Defaults, LogConfig}
import org.apache.kafka.common.config.ConfigException
import org.scalatest.junit.JUnit3Suite
import org.junit.{Assert, Test}
import java.util.Properties
import org.scalatest.junit.JUnit3Suite
class LogConfigTest extends JUnit3Suite {
@ -56,6 +58,7 @@ class LogConfigTest extends JUnit3Suite { @@ -56,6 +58,7 @@ class LogConfigTest extends JUnit3Suite {
case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
}
})
@ -70,6 +73,7 @@ class LogConfigTest extends JUnit3Suite { @@ -70,6 +73,7 @@ class LogConfigTest extends JUnit3Suite {
name match {
case LogConfig.UncleanLeaderElectionEnableProp => return
case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" )
case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")

53
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -83,6 +83,59 @@ class KafkaConfigTest extends JUnit3Suite { @@ -83,6 +83,59 @@ class KafkaConfigTest extends JUnit3Suite {
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionUnlimited() {
val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props2 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
props1.put("log.retention.ms", "-1")
props2.put("log.retention.minutes", "-1")
props3.put("log.retention.hours", "-1")
val cfg1 = KafkaConfig.fromProps(props1)
val cfg2 = KafkaConfig.fromProps(props2)
val cfg3 = KafkaConfig.fromProps(props3)
assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis)
assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis)
assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis)
props4.put("log.retention.ms", "-1")
props4.put("log.retention.minutes", "30")
val cfg4 = KafkaConfig.fromProps(props4)
assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis)
props5.put("log.retention.ms", "0")
intercept[IllegalArgumentException] {
val cfg5 = KafkaConfig.fromProps(props5)
}
}
@Test
def testLogRetentionValid {
val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props1.put("log.retention.ms", "0")
props2.put("log.retention.minutes", "0")
props3.put("log.retention.hours", "0")
intercept[IllegalArgumentException] {
val cfg1 = KafkaConfig.fromProps(props1)
}
intercept[IllegalArgumentException] {
val cfg2 = KafkaConfig.fromProps(props2)
}
intercept[IllegalArgumentException] {
val cfg3 = KafkaConfig.fromProps(props3)
}
}
@Test
def testAdvertiseDefaults() {

Loading…
Cancel
Save