From 75fc5eab35aa33cffd9c09a2070dfe287db0ef4e Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Fri, 2 Nov 2012 19:01:38 +0000 Subject: [PATCH] KAFKA-188 Support multiple data directories. Patch reviewed by Neha and Jun. git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1405102 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/cluster/Partition.scala | 5 +- .../src/main/scala/kafka/log/LogManager.scala | 266 +++++++++++------- .../server/HighwaterMarkCheckpoint.scala | 29 +- .../main/scala/kafka/server/KafkaConfig.scala | 5 +- .../main/scala/kafka/server/KafkaServer.scala | 32 +-- .../scala/kafka/server/ReplicaManager.scala | 27 +- .../src/main/scala/kafka/utils/FileLock.scala | 64 +++++ core/src/main/scala/kafka/utils/Utils.scala | 32 ++- .../kafka/utils/VerifiableProperties.scala | 4 + .../integration/KafkaServerTestHarness.scala | 2 +- .../scala/unit/kafka/log/LogManagerTest.scala | 64 ++++- .../unit/kafka/producer/ProducerTest.scala | 4 +- .../server/HighwatermarkPersistenceTest.scala | 52 ++-- .../kafka/server/LeaderElectionTest.scala | 2 +- .../unit/kafka/server/LogRecoveryTest.scala | 20 +- .../kafka/server/ServerShutdownTest.scala | 109 ++++--- .../unit/kafka/server/SimpleFetchTest.scala | 2 - .../scala/unit/kafka/utils/TestUtils.scala | 1 + 18 files changed, 440 insertions(+), 280 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/FileLock.scala diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 87cd0f874a2..9fbfe3079f2 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -37,7 +37,6 @@ class Partition(val topic: String, private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager private val replicaFetcherManager = replicaManager.replicaFetcherManager - private val highwaterMarkCheckpoint = replicaManager.highWatermarkCheckpoint private val zkClient = replicaManager.zkClient var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] @@ -69,8 +68,8 @@ class Partition(val topic: String, case None => if (isReplicaLocal(replicaId)) { val log = logManager.getOrCreateLog(topic, partitionId) - val localReplica = new Replica(replicaId, this, time, - highwaterMarkCheckpoint.read(topic, partitionId).min(log.logEndOffset), Some(log)) + val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset) + val localReplica = new Replica(replicaId, this, time, offset, Some(log)) addReplicaIfNotExists(localReplica) } else { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7f5e9aa5daf..7c369653f1d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -26,61 +26,107 @@ import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig} /** - * The guy who creates and hands out logs + * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. + * All read and write operations are delegated to the individual log instances. + * + * The log manager maintains logs in one or more directories. New logs are created in the data directory + * with the fewest logs. No attempt is made to move partitions after the fact or balance based on + * size or I/O rate. + * + * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe private[kafka] class LogManager(val config: KafkaConfig, scheduler: KafkaScheduler, - private val time: Time, - val logRollDefaultIntervalMs: Long, - val logCleanupIntervalMs: Long, - val logCleanupDefaultAgeMs: Long, - needRecovery: Boolean) extends Logging { + private val time: Time) extends Logging { - val logDir: File = new File(config.logDir) + val CleanShutdownFile = ".kafka_cleanshutdown" + val LockFile = ".lock" + val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray private val logFileSizeMap = config.logFileSizeMap - private val flushInterval = config.flushInterval - private val logCreationLock = new Object + private val logFlushInterval = config.flushInterval private val logFlushIntervals = config.flushIntervalMap + private val logCreationLock = new Object private val logRetentionSizeMap = config.logRetentionSizeMap private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) - this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " + private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours + private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes + private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours - /* Initialize a log for each subdirectory of the main log directory */ - private val logs = new Pool[String, Pool[Int, Log]]() - if(!logDir.exists()) { - info("No log directory found, creating '" + logDir.getAbsolutePath() + "'") - logDir.mkdirs() - } - if(!logDir.isDirectory() || !logDir.canRead()) - throw new KafkaException(logDir.getAbsolutePath() + " is not a readable log directory.") - val subDirs = logDir.listFiles() - if(subDirs != null) { - for(dir <- subDirs) { - if(dir.getName.equals(HighwaterMarkCheckpoint.highWatermarkFileName)){ - // skip valid metadata file + this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " + private val logs = new Pool[TopicAndPartition, Log]() + + createAndValidateLogDirs(logDirs) + private var dirLocks = lockLogDirs(logDirs) + loadLogs(logDirs) + + /** + * 1. Ensure that there are no duplicates in the directory list + * 2. Create each directory if it doesn't exist + * 3. Check that each path is a readable directory + */ + private def createAndValidateLogDirs(dirs: Seq[File]) { + if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size) + throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", ")) + for(dir <- dirs) { + if(!dir.exists) { + info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.") + val created = dir.mkdirs() + if(!created) + throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath) } - else if(!dir.isDirectory()) { - warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") - } else { - info("Loading log '" + dir.getName() + "'") - val topicPartition = parseTopicPartitionName(dir.getName) - val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) - val log = new Log(dir, - maxLogFileSize, - config.maxMessageSize, - flushInterval, - rollIntervalMs, - needRecovery, - config.logIndexMaxSizeBytes, - config.logIndexIntervalBytes, - time, - config.brokerId) - logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]()) - val parts = logs.get(topicPartition.topic) - parts.put(topicPartition.partition, log) + if(!dir.isDirectory || !dir.canRead) + throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.") + } + } + + /** + * Lock all the given directories + */ + private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = { + dirs.map { dir => + val lock = new FileLock(new File(dir, LockFile)) + if(!lock.tryLock()) + throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + + ". A Kafka instance in another process or thread is using this directory.") + lock + } + } + + /** + * Recovery and load all logs in the given data directories + */ + private def loadLogs(dirs: Seq[File]) { + for(dir <- dirs) { + /* check if this set of logs was shut down cleanly */ + val cleanShutDownFile = new File(dir, CleanShutdownFile) + val needsRecovery = cleanShutDownFile.exists + cleanShutDownFile.delete + /* load the logs */ + val subDirs = dir.listFiles() + if(subDirs != null) { + for(dir <- subDirs) { + if(dir.isDirectory){ + info("Loading log '" + dir.getName + "'") + val topicPartition = parseTopicPartitionName(dir.getName) + val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) + val log = new Log(dir, + maxLogFileSize, + config.maxMessageSize, + logFlushInterval, + rollIntervalMs, + needsRecovery, + config.logIndexMaxSizeBytes, + config.logIndexIntervalBytes, + time, + config.brokerId) + val previous = this.logs.put(topicPartition, log) + if(previous != null) + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + } + } } } } @@ -99,18 +145,81 @@ private[kafka] class LogManager(val config: KafkaConfig, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false) } } + + /** + * Get the log if it exists + */ + def getLog(topic: String, partition: Int): Option[Log] = { + val topicAndPartiton = TopicAndPartition(topic, partition) + val log = logs.get(topicAndPartiton) + if (log == null) + None + else + Some(log) + } + /** + * Create the log if it does not exist, if it exists just return it + */ + def getOrCreateLog(topic: String, partition: Int): Log = { + val topicAndPartition = TopicAndPartition(topic, partition) + logs.get(topicAndPartition) match { + case null => createLogIfNotExists(topicAndPartition) + case log: Log => log + } + } /** * Create a log for the given topic and the given partition + * If the log already exists, just return a copy of the existing log */ - private def createLog(topic: String, partition: Int): Log = { + private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = { logCreationLock synchronized { - val d = new File(logDir, topic + "-" + partition) - d.mkdirs() - val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) - new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, time, config.brokerId) + var log = logs.get(topicAndPartition) + + // check if the log has already been created in another thread + if(log != null) + return log + + // if not, create it + val dataDir = nextLogDir() + val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) + dir.mkdirs() + val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize) + log = new Log(dir, + maxLogFileSize, + config.maxMessageSize, + logFlushInterval, + rollIntervalMs, + needsRecovery = false, + config.logIndexMaxSizeBytes, + config.logIndexIntervalBytes, + time, + config.brokerId) + info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) + logs.put(topicAndPartition, log) + log + } + } + + /** + * Choose the next directory in which to create a log. Currently this is done + * by calculating the number of partitions in each directory and then choosing the + * data directory with the fewest partitions. + */ + private def nextLogDir(): File = { + if(logDirs.size == 1) { + logDirs(0) + } else { + // count the number of logs in each parent directory (including 0 for empty directories + val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) + val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap + var dirCounts = (zeros ++ logCounts).toBuffer + + // choose the directory with the least logs in it + val leastLoaded = dirCounts.sortBy(_._2).head + new File(leastLoaded._1) } } @@ -123,48 +232,8 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Get the log if it exists - */ - def getLog(topic: String, partition: Int): Option[Log] = { - val parts = logs.get(topic) - if (parts == null) None - else { - val log = parts.get(partition) - if(log == null) None - else Some(log) - } - } - - /** - * Create the log if it does not exist, if it exists just return it + * Runs through the log removing segments older than a certain age */ - def getOrCreateLog(topic: String, partition: Int): Log = { - var hasNewTopic = false - var parts = logs.get(topic) - if (parts == null) { - val found = logs.putIfNotExists(topic, new Pool[Int, Log]) - if (found == null) - hasNewTopic = true - parts = logs.get(topic) - } - var log = parts.get(partition) - if(log == null) { - // check if this broker hosts this partition - log = createLog(topic, partition) - val found = parts.putIfNotExists(partition, log) - if(found != null) { - // there was already somebody there - log.close() - log = found - } - else - info("Created log for '" + topic + "'-" + partition) - } - - log - } - - /* Runs through the log removing segments older than a certain age */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds val topic = parseTopicPartitionName(log.name).topic @@ -216,14 +285,22 @@ private[kafka] class LogManager(val config: KafkaConfig, */ def shutdown() { debug("Shutting down.") - allLogs.foreach(_.close()) + try { + // close the logs + allLogs.foreach(_.close()) + // mark that the shutdown was clean by creating the clean shutdown marker file + logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile())) + } finally { + // regardless of whether the close succeeded, we need to unlock the data directories + dirLocks.foreach(_.destroy()) + } debug("Shutdown complete.") } /** * Get all the partition logs */ - def allLogs() = logs.values.flatMap(_.values) + def allLogs(): Iterable[Log] = logs.values /** * Flush any log which has exceeded its flush interval and has unwritten messages. @@ -253,12 +330,11 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - - def topics(): Iterable[String] = logs.keys - private def parseTopicPartitionName(name: String): TopicAndPartition = { val index = name.lastIndexOf('-') TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) } + def topics(): Iterable[String] = logs.keys.map(_.topic) + } diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala index 9e292dd2b59..c2d953fede5 100644 --- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala +++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala @@ -17,6 +17,7 @@ package kafka.server import kafka.utils.Logging +import kafka.common._ import java.util.concurrent.locks.ReentrantLock import java.io._ @@ -40,7 +41,7 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { private val hwFileLock = new ReentrantLock() // recover from previous tmp file, if required - def write(highwaterMarksPerPartition: Map[(String, Int), Long]) { + def write(highwaterMarksPerPartition: Map[TopicAndPartition, Long]) { hwFileLock.lock() try { // write to temp file and then swap with the highwatermark file @@ -56,9 +57,7 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { hwFileWriter.newLine() highwaterMarksPerPartition.foreach { partitionAndHw => - val topic = partitionAndHw._1._1 - val partitionId = partitionAndHw._1._2 - hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2)) + hwFileWriter.write("%s %s %s".format(partitionAndHw._1.topic, partitionAndHw._1.partition, partitionAndHw._2)) hwFileWriter.newLine() } hwFileWriter.flush() @@ -77,9 +76,10 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { hwFileLock.lock() try { hwFile.length() match { - case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + - "partition %d. Returning 0 as the highwatermark".format(partition)) - 0L + case 0 => + warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + + "partition %d. Returning 0 as the highwatermark".format(partition)) + 0L case _ => val hwFileReader = new BufferedReader(new FileReader(hwFile)) val version = hwFileReader.readLine().toShort @@ -95,17 +95,18 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { // find the index of partition val partitionIndex = nextHwEntry.indexOf(partitionId) val topic = nextHwEntry.substring(0, partitionIndex-1) - ((topic, partitionId.toInt) -> highwaterMark) + (TopicAndPartition(topic, partitionId.toInt) -> highwaterMark) } hwFileReader.close() - val hwOpt = partitionHighWatermarks.toMap.get((topic, partition)) + val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition)) hwOpt match { - case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file" - .format(hw, topic, partition)) + case Some(hw) => + debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition)) hw - case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + - "partition %d. Returning 0 as the highwatermark".format(partition)) - 0L + case None => + warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + + "partition %d. Returning 0 as the highwatermark".format(partition)) + 0L } case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version) System.exit(1) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e25cf810bd8..6a5b4de7c34 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,7 +21,7 @@ import java.util.Properties import kafka.message.Message import kafka.consumer.ConsumerConfig import java.net.InetAddress -import kafka.utils.{VerifiableProperties, ZKConfig} +import kafka.utils.{VerifiableProperties, ZKConfig, Utils} /** * Configuration settings for the kafka server @@ -74,7 +74,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) /* the directories in which the log data is kept */ - val logDir = props.getString("log.dir") + val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", ""))) + require(logDirs.size > 0) /* the maximum size of a single log file */ val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e9bba4cdd0e..10cc841fed4 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -32,7 +32,6 @@ import kafka.controller.{ControllerStat, KafkaController} */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { this.logIdent = "[Kafka Server " + config.brokerId + "], " - val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) var socketServer: SocketServer = null @@ -53,12 +52,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("starting") isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) - var needRecovery = true - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) - if (cleanShutDownFile.exists) { - needRecovery = false - cleanShutDownFile.delete - } /* start scheduler */ kafkaScheduler.startup @@ -66,11 +59,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start log manager */ logManager = new LogManager(config, kafkaScheduler, - time, - 1000L * 60 * 60 * config.logRollHours, - 1000L * 60 * config.logCleanupIntervalMinutes, - 1000L * 60 * 60 * config.logRetentionHours, - needRecovery) + time) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -122,25 +111,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { if(requestHandlerPool != null) - requestHandlerPool.shutdown() - kafkaScheduler.shutdown() + Utils.swallow(requestHandlerPool.shutdown()) + Utils.swallow(kafkaScheduler.shutdown()) if(apis != null) - apis.close() + Utils.swallow(apis.close()) if(kafkaZookeeper != null) - kafkaZookeeper.shutdown() + Utils.swallow(kafkaZookeeper.shutdown()) if(replicaManager != null) - replicaManager.shutdown() + Utils.swallow(replicaManager.shutdown()) if(socketServer != null) - socketServer.shutdown() + Utils.swallow(socketServer.shutdown()) if(logManager != null) - logManager.shutdown() + Utils.swallow(logManager.shutdown()) if(kafkaController != null) - kafkaController.shutdown() + Utils.swallow(kafkaController.shutdown()) - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) - debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath()) - cleanShutDownFile.createNewFile shutdownLatch.countDown() info("shut down completed") } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 55a33f4ab5b..d28dcffdf37 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,7 +25,7 @@ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit -import kafka.common.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} +import kafka.common._ import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest} @@ -44,8 +44,7 @@ class ReplicaManager(val config: KafkaConfig, val replicaFetcherManager = new ReplicaFetcherManager(config, this) this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) - info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) + val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap newGauge( "LeaderCount", @@ -245,22 +244,12 @@ class ReplicaManager(val config: KafkaConfig, * Flushes the highwatermark value for all partitions to the highwatermark file */ def checkpointHighWatermarks() { - val highWaterarksForAllPartitions = allPartitions.map { - partition => - val topic = partition._1._1 - val partitionId = partition._1._2 - val localReplicaOpt = partition._2.getReplica(config.brokerId) - val hw = localReplicaOpt match { - case Some(localReplica) => localReplica.highWatermark - case None => - error("Highwatermark for topic %s partition %d doesn't exist during checkpointing" - .format(topic, partitionId)) - 0L - } - (topic, partitionId) -> hw - }.toMap - highWatermarkCheckpoint.write(highWaterarksForAllPartitions) - trace("Checkpointed high watermark data: %s".format(highWaterarksForAllPartitions)) + val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + for((dir, reps) <- replicasByDir) { + val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap + highWatermarkCheckpoints(dir).write(hwms) + } } def shutdown() { diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala new file mode 100644 index 00000000000..ad7a59742b8 --- /dev/null +++ b/core/src/main/scala/kafka/utils/FileLock.scala @@ -0,0 +1,64 @@ +package kafka.utils + +import java.io._ +import java.nio.channels._ + +/** + * A file lock a la flock/funlock + * + * The given path will be created and opened if it doesn't exist. + */ +class FileLock(val file: File) extends Logging { + file.createNewFile() + private val channel = new RandomAccessFile(file, "rw").getChannel() + private var flock: java.nio.channels.FileLock = null + + /** + * Lock the file or throw an exception if the lock is already held + */ + def lock() { + this synchronized { + trace("Acquiring lock on " + file.getAbsolutePath) + flock = channel.lock() + } + } + + /** + * Try to lock the file and return true if the locking succeeds + */ + def tryLock(): Boolean = { + this synchronized { + trace("Acquiring lock on " + file.getAbsolutePath) + try { + // weirdly this method will return null if the lock is held by another + // process, but will throw an exception if the lock is held by this process + // so we have to handle both cases + flock = channel.tryLock() + flock != null + } catch { + case e: OverlappingFileLockException => false + } + } + } + + /** + * Unlock the lock if it is held + */ + def unlock() { + this synchronized { + trace("Releasing lock on " + file.getAbsolutePath) + if(flock != null) + flock.release() + } + } + + /** + * Destroy this lock, closing the associated FileChannel + */ + def destroy() = { + this synchronized { + unlock() + channel.close() + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 753234e547f..898a5b230d8 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -225,23 +225,29 @@ object Utils extends Logging { */ def rm(file: String): Unit = rm(new File(file)) + /** + * Recursively delete the list of files/directories and any subfiles (if any exist) + * @param a sequence of files to be deleted + */ + def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting */ - def rm(file: File): Unit = { - if(file == null) { - return - } else if(file.isDirectory) { - val files = file.listFiles() - if(files != null) { - for(f <- files) - rm(f) - } - file.delete() - } else { - file.delete() - } + def rm(file: File) { + if(file == null) { + return + } else if(file.isDirectory) { + val files = file.listFiles() + if(files != null) { + for(f <- files) + rm(f) + } + file.delete() + } else { + file.delete() + } } /** diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index 30c2758fc8b..d694ba98522 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -22,6 +22,8 @@ import scala.collection._ class VerifiableProperties(val props: Properties) extends Logging { private val referenceSet = mutable.HashSet[String]() + + def this() = this(new Properties) def containsKey(name: String): Boolean = { props.containsKey(name) @@ -185,4 +187,6 @@ class VerifiableProperties(val props: Properties) extends Logging { info("Property %s is overridden to %s".format(key, props.getProperty(key))) } } + + override def toString(): String = props.toString } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index b4ee2d9d643..194dd70919a 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -40,7 +40,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) + servers.map(server => server.config.logDirs.map(Utils.rm(_))) super.tearDown } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index ab6ef436756..7500dd179f5 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -18,21 +18,23 @@ package kafka.log import java.io._ +import java.nio.channels.OverlappingFileLockException import junit.framework.Assert._ import org.junit.Test import kafka.common.OffsetOutOfRangeException import org.scalatest.junit.JUnit3Suite import kafka.server.KafkaConfig +import kafka.common._ import kafka.utils._ class LogManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val maxRollInterval = 100 - val maxLogAge = 1000 + val maxLogAgeHours = 10 var logDir: File = null var logManager: LogManager = null - var config:KafkaConfig = null + var config: KafkaConfig = null val name = "kafka" val veryLargeLogFlushInterval = 10000000L val scheduler = new KafkaScheduler(2) @@ -41,12 +43,13 @@ class LogManagerTest extends JUnit3Suite { super.setUp() config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) { override val logFileSize = 1024 - override val flushInterval = 100 + override val flushInterval = 10000 + override val logRetentionHours = maxLogAgeHours } scheduler.startup - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time) logManager.startup - logDir = logManager.logDir + logDir = logManager.logDirs(0) } override def tearDown() { @@ -54,13 +57,14 @@ class LogManagerTest extends JUnit3Suite { if(logManager != null) logManager.shutdown() Utils.rm(logDir) + logManager.logDirs.map(Utils.rm(_)) super.tearDown() } @Test def testCreateLog() { val log = logManager.getOrCreateLog(name, 0) - val logFile = new File(config.logDir, name + "-0") + val logFile = new File(config.logDirs(0), name + "-0") assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) } @@ -68,7 +72,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testGetLog() { val log = logManager.getLog(name, 0) - val logFile = new File(config.logDir, name + "-0") + val logFile = new File(config.logDirs(0), name + "-0") assertTrue(!logFile.exists) } @@ -87,12 +91,13 @@ class LogManagerTest extends JUnit3Suite { // update the last modified time of all log segments val logSegments = log.segments.view - logSegments.foreach(s => s.messageSet.file.setLastModified(time.currentMs)) + logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs)) - time.currentMs += maxLogAge + 3000 + time.currentMs += maxLogAgeHours*60*60*1000 + 1 logManager.cleanupLogs() assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) + try { log.read(0, 1024) fail("Should get exception from fetching earlier.") @@ -115,8 +120,9 @@ class LogManagerTest extends JUnit3Suite { override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] override val logRetentionHours = retentionHours override val flushInterval = 100 + override val logRollHours = maxRollInterval } - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs, false) + logManager = new LogManager(config, scheduler, time) logManager.startup // create a log @@ -157,17 +163,47 @@ class LogManagerTest extends JUnit3Suite { override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 override val flushInterval = Int.MaxValue + override val logRollHours = maxRollInterval override val flushIntervalMap = Map("timebasedflush" -> 100) } - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time) logManager.startup val log = logManager.getOrCreateLog(name, 0) for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) log.append(set) } - println("now = " + System.currentTimeMillis + " last flush = " + log.getLastFlushedTime) - assertTrue("The last flush time has to be within defaultflushInterval of current time ", - (System.currentTimeMillis - log.getLastFlushedTime) < 150) + val ellapsed = System.currentTimeMillis - log.getLastFlushedTime + assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed), + ellapsed < 2*config.flushSchedulerThreadRate) + } + + @Test + def testLeastLoadedAssignment() { + // create a log manager with multiple data directories + val props = TestUtils.createBrokerConfig(0, -1) + val dirs = Seq(TestUtils.tempDir().getAbsolutePath, + TestUtils.tempDir().getAbsolutePath, + TestUtils.tempDir().getAbsolutePath) + props.put("log.directories", dirs.mkString(",")) + logManager.shutdown() + logManager = new LogManager(new KafkaConfig(props), scheduler, time) + + // verify that logs are always assigned to the least loaded partition + for(partition <- 0 until 20) { + logManager.getOrCreateLog("test", partition) + assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size) + val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size) + assertTrue("Load should balance evenly", counts.max <= counts.min + 1) + } + } + + def testTwoLogManagersUsingSameDirFails() { + try { + new LogManager(logManager.config, scheduler, time) + fail("Should not be able to create a second log manager instance with the same data directory") + } catch { + case e: KafkaException => // this is good + } } } diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 6f27ffc7fda..b444dd87f65 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -79,8 +79,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ server1.awaitShutdown() server2.shutdown server2.awaitShutdown() - Utils.rm(server1.config.logDir) - Utils.rm(server2.config.logDir) + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index ca5fcb2539c..214c21be18e 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -16,14 +16,15 @@ */ package kafka.server -import kafka.log.Log +import kafka.log.{Log, LogManager} import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite import org.easymock.EasyMock +import org.junit._ import org.junit.Assert._ import kafka.common.KafkaException import kafka.cluster.Replica -import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime} +import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -31,30 +32,37 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { override val defaultFlushIntervalMs = 100 }) val topic = "foo" + val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime)) + + @After + def teardown() { + for(manager <- logManagers; dir <- manager.logDirs) + Utils.rm(dir) + } def testHighWatermarkPersistenceSinglePartition() { // mock zkclient val zkClient = EasyMock.createMock(classOf[ZkClient]) EasyMock.replay(zkClient) + // create kafka scheduler val scheduler = new KafkaScheduler(2) scheduler.startup // create replica manager - val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null) + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0)) replicaManager.startup() replicaManager.checkpointHighWatermarks() - var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + var fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(0L, fooPartition0Hw) val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) - // create leader log - val log0 = getMockLog // create leader and follower replicas + val log0 = logManagers(0).getOrCreateLog(topic, 0) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) partition0.addReplicaIfNotExists(leaderReplicaPartition0) val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime) partition0.addReplicaIfNotExists(followerReplicaPartition0) replicaManager.checkpointHighWatermarks() - fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) try { followerReplicaPartition0.highWatermark @@ -65,10 +73,9 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { // set the highwatermark for local replica partition0.getReplica().get.highWatermark = 5L replicaManager.checkpointHighWatermarks() - fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) EasyMock.verify(zkClient) - EasyMock.verify(log0) } def testHighWatermarkPersistenceMultiplePartitions() { @@ -81,35 +88,35 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { val scheduler = new KafkaScheduler(2) scheduler.startup // create replica manager - val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null) + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0)) replicaManager.startup() replicaManager.checkpointHighWatermarks() - var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(0L, topic1Partition0Hw) val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) // create leader log - val topic1Log0 = getMockLog + val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0) // create a local replica for topic1 val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0)) topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) replicaManager.checkpointHighWatermarks() - topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw) // set the highwatermark for local replica topic1Partition0.getReplica().get.highWatermark = 5L replicaManager.checkpointHighWatermarks() - topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) // create leader log - val topic2Log0 = getMockLog + val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0) // create a local replica for topic2 val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0)) topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) replicaManager.checkpointHighWatermarks() - var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0) + var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw) // set the highwatermark for local replica topic2Partition0.getReplica().get.highWatermark = 15L @@ -119,19 +126,16 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark) replicaManager.checkpointHighWatermarks() // verify checkpointed hw for topic 2 - topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0) + topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) assertEquals(15L, topic2Partition0Hw) // verify checkpointed hw for topic 1 - topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(10L, topic1Partition0Hw) EasyMock.verify(zkClient) - EasyMock.verify(topic1Log0) - EasyMock.verify(topic2Log0) } - private def getMockLog: Log = { - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.replay(log) - log + def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { + replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read(topic, partition) } + } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 8239b642694..3eae29e96a5 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -45,7 +45,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) + servers.map(server => Utils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 59efaf4ad83..cce858f012e 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -45,8 +45,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val message = new Message("hello".getBytes()) var producer: Producer[Int, Message] = null - var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir) - var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir) + var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0)) + var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] def testHWCheckpointNoFailuresSingleLogSegment { @@ -83,7 +83,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(numMessages, followerHW) - servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)}) + servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))}) } def testHWCheckpointWithFailuresSingleLogSegment { @@ -148,7 +148,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointNoFailuresMultipleLogSegments { @@ -165,8 +165,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) - hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) - hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") @@ -193,7 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(hw, followerHW) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointWithFailuresMultipleLogSegments { @@ -210,8 +210,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) - hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) - hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") @@ -263,7 +263,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } private def sendMessages(n: Int = 1) { diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index d7380104729..b518a330328 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -31,71 +31,66 @@ import kafka.utils.{TestUtils, Utils} class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort + val props = TestUtils.createBrokerConfig(0, port) + val config = new KafkaConfig(props) + + val host = "localhost" + val topic = "test" + val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes())) + val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes())) @Test def testCleanShutdown() { - val props = TestUtils.createBrokerConfig(0, port) - val config = new KafkaConfig(props) - - val host = "localhost" - val topic = "test" - val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes())) - val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes())) - - { - val server = new KafkaServer(config) - server.startup() - - // create topic - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") - - val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) - - // send some messages - producer.send(new ProducerData[Int, Message](topic, 0, sent1)) - - // do a clean shutdown - server.shutdown() - val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile) + var server = new KafkaServer(config) + server.startup() + var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) + + // create topic + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + // send some messages + producer.send(new ProducerData[Int, Message](topic, 0, sent1)) + + // do a clean shutdown and check that the clean shudown file is written out + server.shutdown() + for(logDir <- config.logDirs) { + val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile) assertTrue(cleanShutDownFile.exists) - producer.close() } + producer.close() + + /* now restart the server and check that the written data is still readable and everything still works */ + server = new KafkaServer(config) + server.startup() + + producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) + val consumer = new SimpleConsumer(host, + port, + 1000000, + 64*1024) + + waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build()) + fetchedMessage = fetched.messageSet(topic, 0) + } + TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator) + val newOffset = fetchedMessage.last.nextOffset + // send some more messages + producer.send(new ProducerData[Int, Message](topic, 0, sent2)) - { - val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) - val consumer = new SimpleConsumer(host, - port, - 1000000, - 64*1024) - - val server = new KafkaServer(config) - server.startup() - - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - - var fetchedMessage: ByteBufferMessageSet = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { - val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) - fetchedMessage = fetched.messageSet(topic, 0) - } - TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator) - val newOffset = fetchedMessage.last.nextOffset - - // send some more messages - producer.send(new ProducerData[Int, Message](topic, 0, sent2)) - - fetchedMessage = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { - val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) - fetchedMessage = fetched.messageSet(topic, 0) - } - TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator) - - server.shutdown() - Utils.rm(server.config.logDir) - producer.close() + fetchedMessage = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0) } + TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator) + consumer.close() + producer.close() + server.shutdown() + Utils.rm(server.config.logDirs) } } diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index e99aebee670..3aae5cefa3c 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -74,7 +74,6 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.config).andReturn(configs.head) EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) EasyMock.replay(replicaManager) @@ -169,7 +168,6 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.config).andReturn(configs.head) EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) EasyMock.replay(replicaManager) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a5c663ce7e0..8dbd85ef764 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -483,6 +483,7 @@ object TestUtils extends Logging { byteBuffer.rewind() byteBuffer } + } object TestZKUtils {