Browse Source

Time based log segment rollout; patched by Swapnil Ghike; reviewed by Jun Rao, Neha Narkhede; KAFKA-475

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1377093 13f79535-47bb-0310-9956-ffa450edef68
pull/9/head
Jun Rao 12 years ago
parent
commit
21326cb9f2
  1. 41
      core/src/main/scala/kafka/log/Log.scala
  2. 32
      core/src/main/scala/kafka/log/LogManager.scala
  3. 26
      core/src/main/scala/kafka/server/KafkaConfig.scala
  4. 1
      core/src/main/scala/kafka/server/KafkaServer.scala
  5. 22
      core/src/main/scala/kafka/utils/Utils.scala
  6. 4
      core/src/test/scala/other/kafka/TestLogPerformance.scala
  7. 17
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  8. 72
      core/src/test/scala/unit/kafka/log/LogTest.scala

41
core/src/main/scala/kafka/log/Log.scala

@ -90,9 +90,23 @@ private[log] object Log { @@ -90,9 +90,23 @@ private[log] object Log {
/**
* A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size
*/
private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range {
var firstAppendTime: Option[Long] = None
@volatile var deleted = false
def size: Long = messageSet.highWaterMark
private def updateFirstAppendTime() {
if (firstAppendTime.isEmpty)
firstAppendTime = Some(time.milliseconds)
}
def append(messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
messageSet.append(messages)
updateFirstAppendTime()
}
}
override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
}
@ -101,9 +115,8 @@ private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, va @@ -101,9 +115,8 @@ private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, va
* An append-only log for storing messages.
*/
@threadsafe
private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int,
val flushInterval: Int, val needRecovery: Boolean) extends Logging {
private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val maxMessageSize: Int,
val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean) extends Logging {
/* A lock that guards all modifications to the log */
private val lock = new Object
@ -121,7 +134,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -121,7 +134,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
private val logStats = new LogStats(this)
Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
/* Load the log segments from the log files on disk */
private def loadSegments(): SegmentList[LogSegment] = {
@ -135,7 +148,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -135,7 +148,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
val filename = file.getName()
val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
val messageSet = new FileMessageSet(file, false)
accum.add(new LogSegment(file, messageSet, start))
accum.add(new LogSegment(file, time, messageSet, start))
}
}
@ -143,7 +156,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -143,7 +156,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
// no existing segments, create a new mutable segment
val newFile = new File(dir, Log.nameFromOffset(0))
val set = new FileMessageSet(newFile, true)
accum.add(new LogSegment(newFile, set, 0))
accum.add(new LogSegment(newFile, time, set, 0))
} else {
// there is at least one existing segment, validate and recover them/it
// sort segments into ascending order for fast searching
@ -160,7 +173,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -160,7 +173,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
val last = accum.remove(accum.size - 1)
last.messageSet.close()
info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
val mutable = new LogSegment(last.file, time, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
accum.add(mutable)
}
new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
@ -227,10 +240,11 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -227,10 +240,11 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
// they are valid, insert them in the log
lock synchronized {
try {
val segment = segments.view.last
segment.messageSet.append(validMessages)
maybeFlush(numberOfMessages)
var segment = segments.view.last
maybeRoll(segment)
segment = segments.view.last
segment.append(validMessages)
maybeFlush(numberOfMessages)
}
catch {
case e: IOException =>
@ -301,7 +315,8 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -301,7 +315,8 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
* Roll the log over if necessary
*/
private def maybeRoll(segment: LogSegment) {
if(segment.messageSet.sizeInBytes > maxSize)
if((segment.messageSet.sizeInBytes > maxSize) ||
((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
roll()
}
@ -317,7 +332,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int @@ -317,7 +332,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int
newFile.delete()
}
debug("Rolling log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
segments.append(new LogSegment(newFile, time, new FileMessageSet(newFile, true), newOffset))
}
}

32
core/src/main/scala/kafka/log/LogManager.scala

@ -33,13 +33,14 @@ import kafka.api.OffsetRequest @@ -33,13 +33,14 @@ import kafka.api.OffsetRequest
private[kafka] class LogManager(val config: KafkaConfig,
private val scheduler: KafkaScheduler,
private val time: Time,
val logRollDefaultIntervalMs: Long,
val logCleanupIntervalMs: Long,
val logCleanupDefaultAgeMs: Long,
needRecovery: Boolean) extends Logging {
val logDir: File = new File(config.logDir)
private val numPartitions = config.numPartitions
private val maxSize: Long = config.logFileSize
private val logFileSizeMap = config.logFileSizeMap
private val flushInterval = config.flushInterval
private val topicPartitionsMap = config.topicPartitionsMap
private val logCreationLock = new Object
@ -49,8 +50,9 @@ private[kafka] class LogManager(val config: KafkaConfig, @@ -49,8 +50,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
private val logRetentionSize = config.logRetentionSize
private val logRetentionSizeMap = config.logRetentionSizeMap
private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
private val logRollMsMap = getMsMap(config.logRollHoursMap)
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@ -67,7 +69,10 @@ private[kafka] class LogManager(val config: KafkaConfig, @@ -67,7 +69,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
val log = new Log(dir, maxSize, config.maxMessageSize, flushInterval, needRecovery)
val topic = Utils.getTopicPartition(dir.getName)._1
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery)
val topicPartion = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
val parts = logs.get(topicPartion._1)
@ -108,10 +113,11 @@ private[kafka] class LogManager(val config: KafkaConfig, @@ -108,10 +113,11 @@ private[kafka] class LogManager(val config: KafkaConfig,
case object StopActor
private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
private def getMsMap(hoursMap: Map[String, Int]) : Map[String, Long] = {
var ret = new mutable.HashMap[String, Long]
for ( (topic, hour) <- logRetentionHourMap )
for ( (topic, hour) <- hoursMap ) {
ret.put(topic, hour * 60 * 60 * 1000L)
}
ret
}
@ -146,7 +152,9 @@ private[kafka] class LogManager(val config: KafkaConfig, @@ -146,7 +152,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
logCreationLock synchronized {
val d = new File(logDir, topic + "-" + partition)
d.mkdirs()
new Log(d, maxSize, config.maxMessageSize, flushInterval, false)
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
}
}
@ -236,7 +244,7 @@ private[kafka] class LogManager(val config: KafkaConfig, @@ -236,7 +244,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = Utils.getTopicPartition(log.dir.getName)._1
val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val logCleanupThresholdMS = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
val total = deleteSegments(log, toBeDeleted)
total
@ -247,8 +255,10 @@ private[kafka] class LogManager(val config: KafkaConfig, @@ -247,8 +255,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
var diff = log.size - logRetentionSize
val topic = Utils.getTopicPartition(log.dir.getName)._1
val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size

26
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -60,18 +60,27 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { @@ -60,18 +60,27 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
/* the maximum size of a single log file */
val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
/* the maximum time before a new log segment is rolled out */
val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
/* the maximum size of the log before deleting it */
val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
/* the maximum size of the log for some specific topic before deleting it */
val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
@ -79,6 +88,9 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { @@ -79,6 +88,9 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
/* enable zookeeper registration in the server */
val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))

1
core/src/main/scala/kafka/server/KafkaServer.scala

@ -58,6 +58,7 @@ class KafkaServer(val config: KafkaConfig) extends Logging { @@ -58,6 +58,7 @@ class KafkaServer(val config: KafkaConfig) extends Logging {
logManager = new LogManager(config,
scheduler,
SystemTime,
1000L * 60 * 60 * config.logRollHours,
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)

22
core/src/main/scala/kafka/utils/Utils.scala

@ -566,7 +566,7 @@ object Utils extends Logging { @@ -566,7 +566,7 @@ object Utils extends Logging {
}
/**
* This method gets comma seperated values which contains key,value pairs and returns a map of
* This method gets comma separated values which contains key,value pairs and returns a map of
* key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
*/
private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
@ -595,12 +595,30 @@ object Utils extends Logging { @@ -595,12 +595,30 @@ object Utils extends Logging {
}
}
def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
val successMsg = "The retention hour for "
getCSVMap(retentionHours, exceptionMsg, successMsg)
}
def getTopicRollHours(rollHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
val successMsg = "The roll hour for "
getCSVMap(rollHours, exceptionMsg, successMsg)
}
def getTopicFileSize(fileSizes: String): Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: "
val successMsg = "The roll hour for "
getCSVMap(fileSizes, exceptionMsg, successMsg)
}
def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = {
val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: "
val successMsg = "The roll hour for "
getCSVMap(retentionSizes, exceptionMsg, successMsg)
}
def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
val successMsg = "The flush interval for "

4
core/src/test/scala/other/kafka/TestLogPerformance.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.log
import kafka.message._
import kafka.utils.{TestUtils, Utils}
import kafka.utils.{TestUtils, Utils, SystemTime}
import kafka.server.KafkaConfig
object TestLogPerformance {
@ -33,7 +33,7 @@ object TestLogPerformance { @@ -33,7 +33,7 @@ object TestLogPerformance {
val batchSize = args(2).toInt
val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
val dir = TestUtils.tempDir()
val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, false)
val log = new Log(dir, SystemTime, 50*1024*1024, config.maxMessageSize, 5000000, 24*7*60*60*1000L, false)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)

17
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -28,6 +28,7 @@ import kafka.common.OffsetOutOfRangeException @@ -28,6 +28,7 @@ import kafka.common.OffsetOutOfRangeException
class LogManagerTest extends JUnitSuite {
val time: MockTime = new MockTime()
val maxSegAge = 100
val maxLogAge = 1000
var logDir: File = null
var logManager: LogManager = null
@ -41,7 +42,7 @@ class LogManagerTest extends JUnitSuite { @@ -41,7 +42,7 @@ class LogManagerTest extends JUnitSuite {
override val enableZookeeper = false
override val flushInterval = 100
}
logManager = new LogManager(config, null, time, -1, maxLogAge, false)
logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
logManager.startup
logDir = logManager.logDir
}
@ -111,11 +112,11 @@ class LogManagerTest extends JUnitSuite { @@ -111,11 +112,11 @@ class LogManagerTest extends JUnitSuite {
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
override val enableZookeeper = false
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logRetentionHours = retentionHours
override val flushInterval = 100
}
logManager = new LogManager(config, null, time, -1, retentionMs, false)
logManager = new LogManager(config, null, time, maxSegAge, -1, retentionMs, false)
logManager.startup
// create a log
@ -132,12 +133,12 @@ class LogManagerTest extends JUnitSuite { @@ -132,12 +133,12 @@ class LogManagerTest extends JUnitSuite {
log.flush
Thread.sleep(2000)
// should be exactly 100 full segments + 1 new empty one
assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
// should be exactly 100 full segments
assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
logManager.cleanupLogs()
assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
try {
log.read(0, 1024)
@ -161,7 +162,7 @@ class LogManagerTest extends JUnitSuite { @@ -161,7 +162,7 @@ class LogManagerTest extends JUnitSuite {
override val flushInterval = Int.MaxValue
override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
}
logManager = new LogManager(config, null, time, -1, maxLogAge, false)
logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
logManager.startup
val log = logManager.getOrCreateLog("timebasedflush", 0)
for(i <- 0 until 200) {
@ -185,7 +186,7 @@ class LogManagerTest extends JUnitSuite { @@ -185,7 +186,7 @@ class LogManagerTest extends JUnitSuite {
override val flushInterval = 100
}
logManager = new LogManager(config, null, time, -1, maxLogAge, false)
logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
logManager.startup
for(i <- 0 until 2) {

72
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -22,7 +22,7 @@ import java.util.ArrayList @@ -22,7 +22,7 @@ import java.util.ArrayList
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils.{Utils, TestUtils, Range}
import kafka.utils.{Utils, TestUtils, Range, SystemTime, MockTime}
import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.server.KafkaConfig
@ -47,18 +47,70 @@ class LogTest extends JUnitSuite { @@ -47,18 +47,70 @@ class LogTest extends JUnitSuite {
for(offset <- offsets)
new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
}
/** Test that the size and time based log segment rollout works. */
@Test
def testTimeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
val rollMs = 1 * 60 * 60L
val time: MockTime = new MockTime()
// create a log
val log = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false)
time.currentMs += rollMs + 1
// segment age is less than its limit
log.append(set)
assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
log.append(set)
assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
// segment expires in age
time.currentMs += rollMs + 1
log.append(set)
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
time.currentMs += rollMs + 1
val blank = Array[Message]()
log.append(new ByteBufferMessageSet(blank:_*))
assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
time.currentMs += rollMs + 1
// the last segment expired in age, but was blank. So new segment should not be generated
log.append(set)
assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
}
@Test
def testSizeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
// create a log
val log = new Log(logDir, SystemTime, segSize, config.maxMessageSize, 1000, 10000, false)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
for (i<- 1 to (msgPerSeg + 1)) {
log.append(set)
}
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
}
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
new Log(logDir, 1024, config.maxMessageSize, 1000, false)
new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
}
@Test
def testLoadInvalidLogsFails() {
createEmptyLogs(logDir, 0, 15)
try {
new Log(logDir, 1024, config.maxMessageSize, 1000, false)
new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
fail("Allowed load of corrupt logs without complaint.")
} catch {
case e: IllegalStateException => "This is good"
@ -67,7 +119,7 @@ class LogTest extends JUnitSuite { @@ -67,7 +119,7 @@ class LogTest extends JUnitSuite {
@Test
def testAppendAndRead() {
val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
val log = new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 10)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@ -84,7 +136,7 @@ class LogTest extends JUnitSuite { @@ -84,7 +136,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
val log = new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@ -104,7 +156,7 @@ class LogTest extends JUnitSuite { @@ -104,7 +156,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val numMessages = 100
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@ -159,7 +211,7 @@ class LogTest extends JUnitSuite { @@ -159,7 +211,7 @@ class LogTest extends JUnitSuite {
def testEdgeLogRolls() {
{
// first test a log segment starting at 0
val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val curOffset = log.nextAppendOffset
assertEquals(curOffset, 0)
@ -172,7 +224,7 @@ class LogTest extends JUnitSuite { @@ -172,7 +224,7 @@ class LogTest extends JUnitSuite {
{
// second test an empty log segment starting at none-zero
val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val numMessages = 1
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@ -201,7 +253,7 @@ class LogTest extends JUnitSuite { @@ -201,7 +253,7 @@ class LogTest extends JUnitSuite {
val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
// append messages to log
val log = new Log(logDir, 100, 5, 1000, false)
val log = new Log(logDir, SystemTime, 100, 5, 1000, 24*7*60*60*1000L, false)
var ret =
try {

Loading…
Cancel
Save