Jay Kreps
12 years ago
40 changed files with 2280 additions and 547 deletions
@ -0,0 +1,23 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.common |
||||||
|
|
||||||
|
/** |
||||||
|
* Thrown when an optimistic locking attempt receives concurrent modifications |
||||||
|
*/ |
||||||
|
class OptimisticLockFailureException(message: String) extends RuntimeException(message) |
@ -0,0 +1,41 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
/** |
||||||
|
* Configuration parameters for the log cleaner |
||||||
|
* |
||||||
|
* @param numThreads The number of cleaner threads to run |
||||||
|
* @param dedupeBufferSize The total memory used for log deduplication |
||||||
|
* @param dedupeBufferLoadFactor The maximum percent full for the deduplication buffer |
||||||
|
* @param maxMessageSize The maximum size of a message that can appear in the log |
||||||
|
* @param maxIoBytesPerSecond The maximum read and write I/O that all cleaner threads are allowed to do |
||||||
|
* @param backOffMs The amount of time to wait before rechecking if no logs are eligible for cleaning |
||||||
|
* @param enableCleaner Allows completely disabling the log cleaner |
||||||
|
* @param hashAlgorithm The hash algorithm to use in key comparison. |
||||||
|
*/ |
||||||
|
case class CleanerConfig(val numThreads: Int = 1, |
||||||
|
val dedupeBufferSize: Int = 4*1024*1024, |
||||||
|
val dedupeBufferLoadFactor: Double = 0.75, |
||||||
|
val ioBufferSize: Int = 1024*1024, |
||||||
|
val maxMessageSize: Int = 32*1024*1024, |
||||||
|
val maxIoBytesPerSecond: Double = Double.MaxValue, |
||||||
|
val backOffMs: Long = 60 * 1000, |
||||||
|
val enableCleaner: Boolean = true, |
||||||
|
val hashAlgorithm: String = "MD5") { |
||||||
|
} |
@ -0,0 +1,557 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
import scala.collection._ |
||||||
|
import scala.math |
||||||
|
import java.nio._ |
||||||
|
import java.util.concurrent.Semaphore |
||||||
|
import java.util.concurrent.TimeUnit |
||||||
|
import java.util.concurrent.atomic._ |
||||||
|
import java.io.File |
||||||
|
import kafka.common._ |
||||||
|
import kafka.message._ |
||||||
|
import kafka.server.OffsetCheckpoint |
||||||
|
import kafka.utils._ |
||||||
|
|
||||||
|
/** |
||||||
|
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. |
||||||
|
* A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'. |
||||||
|
* |
||||||
|
* Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a |
||||||
|
* "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning. |
||||||
|
* |
||||||
|
* The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy |
||||||
|
* and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. |
||||||
|
* |
||||||
|
* To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping |
||||||
|
* is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of |
||||||
|
* the implementation of the mapping. |
||||||
|
* |
||||||
|
* Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a |
||||||
|
* higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log). |
||||||
|
* |
||||||
|
* To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when |
||||||
|
* doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning. |
||||||
|
* |
||||||
|
* Cleaned segments are swapped into the log as they become available. |
||||||
|
* |
||||||
|
* One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted. |
||||||
|
* |
||||||
|
* @param config Configuration parameters for the cleaner |
||||||
|
* @param logDirs The directories where offset checkpoints reside |
||||||
|
* @param logs The pool of logs |
||||||
|
* @param time A way to control the passage of time |
||||||
|
*/ |
||||||
|
class LogCleaner(val config: CleanerConfig, |
||||||
|
val logDirs: Array[File], |
||||||
|
val logs: Pool[TopicAndPartition, Log], |
||||||
|
time: Time = SystemTime) extends Logging { |
||||||
|
|
||||||
|
/* the offset checkpoints holding the last cleaned point for each log */ |
||||||
|
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap |
||||||
|
|
||||||
|
/* the set of logs currently being cleaned */ |
||||||
|
private val inProgress = mutable.HashSet[TopicAndPartition]() |
||||||
|
|
||||||
|
/* a global lock used to control all access to the in-progress set and the offset checkpoints */ |
||||||
|
private val lock = new Object |
||||||
|
|
||||||
|
/* a counter for creating unique thread names*/ |
||||||
|
private val threadId = new AtomicInteger(0) |
||||||
|
|
||||||
|
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ |
||||||
|
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, |
||||||
|
checkIntervalMs = 300, |
||||||
|
throttleDown = true, |
||||||
|
time = time) |
||||||
|
|
||||||
|
/* the threads */ |
||||||
|
private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread()) |
||||||
|
|
||||||
|
/* a hook for testing to synchronize on log cleaning completions */ |
||||||
|
private val cleaned = new Semaphore(0) |
||||||
|
|
||||||
|
/** |
||||||
|
* Start the background cleaning |
||||||
|
*/ |
||||||
|
def startup() { |
||||||
|
info("Starting the log cleaner") |
||||||
|
cleaners.foreach(_.start()) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Stop the background cleaning |
||||||
|
*/ |
||||||
|
def shutdown() { |
||||||
|
info("Shutting down the log cleaner.") |
||||||
|
cleaners.foreach(_.interrupt()) |
||||||
|
cleaners.foreach(_.join()) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* For testing, a way to know when work has completed. This method blocks until the |
||||||
|
* cleaner has processed up to the given offset on the specified topic/partition |
||||||
|
*/ |
||||||
|
def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = { |
||||||
|
while(!allCleanerCheckpoints.contains(TopicAndPartition(topic, part))) |
||||||
|
cleaned.tryAcquire(timeout, TimeUnit.MILLISECONDS) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* @return the position processed for all logs. |
||||||
|
*/ |
||||||
|
def allCleanerCheckpoints(): Map[TopicAndPartition, Long] = |
||||||
|
checkpoints.values.flatMap(_.read()).toMap |
||||||
|
|
||||||
|
/** |
||||||
|
* Choose the log to clean next and add it to the in-progress set. We recompute this |
||||||
|
* every time off the full set of logs to allow logs to be dynamically added to the pool of logs |
||||||
|
* the log manager maintains. |
||||||
|
*/ |
||||||
|
private def grabFilthiestLog(): Option[LogToClean] = { |
||||||
|
lock synchronized { |
||||||
|
val lastClean = allCleanerCheckpoints() |
||||||
|
val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe |
||||||
|
.filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress |
||||||
|
.map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each |
||||||
|
val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes |
||||||
|
.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio |
||||||
|
if(dirtyLogs.isEmpty) { |
||||||
|
None |
||||||
|
} else { |
||||||
|
val filthiest = dirtyLogs.max |
||||||
|
inProgress += filthiest.topicPartition |
||||||
|
Some(filthiest) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Save out the endOffset and remove the given log from the in-progress set. |
||||||
|
*/ |
||||||
|
private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) { |
||||||
|
lock synchronized { |
||||||
|
val checkpoint = checkpoints(dataDir) |
||||||
|
val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) |
||||||
|
checkpoint.write(offsets) |
||||||
|
inProgress -= topicAndPartition |
||||||
|
} |
||||||
|
cleaned.release() |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by |
||||||
|
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. |
||||||
|
*/ |
||||||
|
private class CleanerThread extends Thread { |
||||||
|
val cleaner = new Cleaner(id = threadId.getAndIncrement(), |
||||||
|
offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads, |
||||||
|
maxLoadFactor = config.dedupeBufferLoadFactor, |
||||||
|
hashAlgorithm = config.hashAlgorithm), |
||||||
|
ioBufferSize = config.ioBufferSize / config.numThreads / 2, |
||||||
|
maxIoBufferSize = config.maxMessageSize, |
||||||
|
throttler = throttler, |
||||||
|
time = time) |
||||||
|
|
||||||
|
setName("kafka-log-cleaner-thread-" + cleaner.id) |
||||||
|
setDaemon(false) |
||||||
|
|
||||||
|
/** |
||||||
|
* The main loop for the cleaner thread |
||||||
|
*/ |
||||||
|
override def run() { |
||||||
|
info("Starting cleaner thread %d...".format(cleaner.id)) |
||||||
|
try { |
||||||
|
while(!isInterrupted) { |
||||||
|
cleanOrSleep() |
||||||
|
} |
||||||
|
} catch { |
||||||
|
case e: InterruptedException => // all done |
||||||
|
case e: Exception => |
||||||
|
error("Error in cleaner thread %d:".format(cleaner.id), e) |
||||||
|
} |
||||||
|
info("Shutting down cleaner thread %d.".format(cleaner.id)) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Clean a log if there is a dirty log available, otherwise sleep for a bit |
||||||
|
*/ |
||||||
|
private def cleanOrSleep() { |
||||||
|
grabFilthiestLog() match { |
||||||
|
case None => |
||||||
|
// there are no cleanable logs, sleep a while |
||||||
|
time.sleep(config.backOffMs) |
||||||
|
case Some(cleanable) => |
||||||
|
// there's a log, clean it |
||||||
|
var endOffset = cleanable.firstDirtyOffset |
||||||
|
try { |
||||||
|
endOffset = cleaner.clean(cleanable) |
||||||
|
logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats) |
||||||
|
} catch { |
||||||
|
case e: OptimisticLockFailureException => |
||||||
|
info("Cleaning of log was aborted due to colliding truncate operation.") |
||||||
|
} finally { |
||||||
|
doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Log out statistics on a single run of the cleaner. |
||||||
|
*/ |
||||||
|
def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { |
||||||
|
def mb(bytes: Double) = bytes / (1024*1024) |
||||||
|
val message = |
||||||
|
"%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + |
||||||
|
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), |
||||||
|
stats.elapsedSecs, |
||||||
|
mb(stats.bytesRead/stats.elapsedSecs)) + |
||||||
|
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), |
||||||
|
stats.elapsedIndexSecs, |
||||||
|
mb(stats.mapBytesRead)/stats.elapsedIndexSecs, |
||||||
|
100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) + |
||||||
|
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), |
||||||
|
stats.elapsedSecs - stats.elapsedIndexSecs, |
||||||
|
mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + |
||||||
|
"\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) + |
||||||
|
"\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + |
||||||
|
"\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), |
||||||
|
100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead)) |
||||||
|
info(message) |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* This class holds the actual logic for cleaning a log |
||||||
|
* @param id An identifier used for logging |
||||||
|
* @param offsetMap The map used for deduplication |
||||||
|
* @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. |
||||||
|
* @param throttler The throttler instance to use for limiting I/O rate. |
||||||
|
* @param time The time instance |
||||||
|
*/ |
||||||
|
private[log] class Cleaner(val id: Int, |
||||||
|
offsetMap: OffsetMap, |
||||||
|
ioBufferSize: Int, |
||||||
|
maxIoBufferSize: Int, |
||||||
|
throttler: Throttler, |
||||||
|
time: Time) extends Logging { |
||||||
|
|
||||||
|
this.logIdent = "Cleaner " + id + ":" |
||||||
|
val stats = new CleanerStats(time) |
||||||
|
private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O |
||||||
|
private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O |
||||||
|
|
||||||
|
/** |
||||||
|
* Clean the given log |
||||||
|
* |
||||||
|
* @param cleanable The log to be cleaned |
||||||
|
* |
||||||
|
* @return The first offset not cleaned |
||||||
|
*/ |
||||||
|
private[log] def clean(cleanable: LogToClean): Long = { |
||||||
|
stats.clear() |
||||||
|
val topic = cleanable.topicPartition.topic |
||||||
|
val part = cleanable.topicPartition.partition |
||||||
|
info("Beginning cleaning of %s-%d.".format(topic, part)) |
||||||
|
val log = cleanable.log |
||||||
|
val truncateCount = log.numberOfTruncates |
||||||
|
|
||||||
|
// build the offset map |
||||||
|
val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity) |
||||||
|
val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1 |
||||||
|
stats.indexDone() |
||||||
|
|
||||||
|
// group the segments and clean the groups |
||||||
|
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) { |
||||||
|
info("Cleaning segments %s for log %s...".format(group.map(_.baseOffset).mkString(","), log.name)) |
||||||
|
cleanSegments(log, group, offsetMap, truncateCount) |
||||||
|
} |
||||||
|
stats.allDone() |
||||||
|
endOffset |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Clean a group of segments into a single replacement segment |
||||||
|
* |
||||||
|
* @param log The log being cleaned |
||||||
|
* @param segments The group of segments being cleaned |
||||||
|
* @param map The offset map to use for cleaning segments |
||||||
|
* @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet |
||||||
|
*/ |
||||||
|
private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) { |
||||||
|
// create a new segment with the suffix .cleaned appended to both the log and index name |
||||||
|
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix) |
||||||
|
logFile.delete() |
||||||
|
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix) |
||||||
|
indexFile.delete() |
||||||
|
val messages = new FileMessageSet(logFile) |
||||||
|
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) |
||||||
|
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time) |
||||||
|
|
||||||
|
// clean segments into the new destination segment |
||||||
|
for (old <- segments) |
||||||
|
cleanInto(old, cleaned, map) |
||||||
|
|
||||||
|
// trim excess index |
||||||
|
index.trimToValidSize() |
||||||
|
|
||||||
|
// flush new segment to disk before swap |
||||||
|
cleaned.flush() |
||||||
|
|
||||||
|
// swap in new segment |
||||||
|
info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) |
||||||
|
try { |
||||||
|
log.replaceSegments(cleaned, segments, expectedTruncateCount) |
||||||
|
} catch { |
||||||
|
case e: OptimisticLockFailureException => |
||||||
|
cleaned.delete() |
||||||
|
throw e |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Clean the given source log segment into the destination segment using the key=>offset mapping |
||||||
|
* provided |
||||||
|
* |
||||||
|
* @param source The dirty log segment |
||||||
|
* @param dest The cleaned log segment |
||||||
|
* @param map The key=>offset mapping |
||||||
|
* |
||||||
|
* TODO: Implement proper compression support |
||||||
|
*/ |
||||||
|
private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) { |
||||||
|
var position = 0 |
||||||
|
while (position < source.log.sizeInBytes) { |
||||||
|
checkDone() |
||||||
|
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out |
||||||
|
readBuffer.clear() |
||||||
|
writeBuffer.clear() |
||||||
|
val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position)) |
||||||
|
throttler.maybeThrottle(messages.sizeInBytes) |
||||||
|
// check each message to see if it is to be retained |
||||||
|
var messagesRead = 0 |
||||||
|
for (entry <- messages) { |
||||||
|
messagesRead += 1 |
||||||
|
val size = MessageSet.entrySize(entry.message) |
||||||
|
position += size |
||||||
|
stats.readMessage(size) |
||||||
|
val key = entry.message.key |
||||||
|
require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath)) |
||||||
|
val lastOffset = map.get(key) |
||||||
|
/* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */ |
||||||
|
val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null) |
||||||
|
if (retainRecord) { |
||||||
|
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) |
||||||
|
stats.recopyMessage(size) |
||||||
|
} |
||||||
|
} |
||||||
|
// if any messages are to be retained, write them out |
||||||
|
if(writeBuffer.position > 0) { |
||||||
|
writeBuffer.flip() |
||||||
|
val retained = new ByteBufferMessageSet(writeBuffer) |
||||||
|
dest.append(retained.head.offset, retained) |
||||||
|
throttler.maybeThrottle(writeBuffer.limit) |
||||||
|
} |
||||||
|
|
||||||
|
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again |
||||||
|
if(readBuffer.limit > 0 && messagesRead == 0) |
||||||
|
growBuffers() |
||||||
|
} |
||||||
|
restoreBuffers() |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Double the I/O buffer capacity |
||||||
|
*/ |
||||||
|
def growBuffers() { |
||||||
|
if(readBuffer.capacity >= maxIoBufferSize || writeBuffer.capacity >= maxIoBufferSize) |
||||||
|
throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxIoBufferSize)) |
||||||
|
val newSize = math.min(this.readBuffer.capacity * 2, maxIoBufferSize) |
||||||
|
info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.") |
||||||
|
this.readBuffer = ByteBuffer.allocate(newSize) |
||||||
|
this.writeBuffer = ByteBuffer.allocate(newSize) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Restore the I/O buffer capacity to its original size |
||||||
|
*/ |
||||||
|
def restoreBuffers() { |
||||||
|
if(this.readBuffer.capacity > this.ioBufferSize) |
||||||
|
this.readBuffer = ByteBuffer.allocate(this.ioBufferSize) |
||||||
|
if(this.writeBuffer.capacity > this.ioBufferSize) |
||||||
|
this.writeBuffer = ByteBuffer.allocate(this.ioBufferSize) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data. |
||||||
|
* We collect a group of such segments together into a single |
||||||
|
* destination segment. This prevents segment sizes from shrinking too much. |
||||||
|
* |
||||||
|
* @param segments The log segments to group |
||||||
|
* @param maxSize the maximum size in bytes for the total of all log data in a group |
||||||
|
* @param maxIndexSize the maximum size in bytes for the total of all index data in a group |
||||||
|
* |
||||||
|
* @return A list of grouped segments |
||||||
|
*/ |
||||||
|
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = { |
||||||
|
var grouped = List[List[LogSegment]]() |
||||||
|
var segs = segments.toList |
||||||
|
while(!segs.isEmpty) { |
||||||
|
var group = List(segs.head) |
||||||
|
var logSize = segs.head.size |
||||||
|
var indexSize = segs.head.index.sizeInBytes |
||||||
|
segs = segs.tail |
||||||
|
while(!segs.isEmpty && |
||||||
|
logSize + segs.head.size < maxSize && |
||||||
|
indexSize + segs.head.index.sizeInBytes < maxIndexSize) { |
||||||
|
group = segs.head :: group |
||||||
|
logSize += segs.head.size |
||||||
|
indexSize += segs.head.index.sizeInBytes |
||||||
|
segs = segs.tail |
||||||
|
} |
||||||
|
grouped ::= group.reverse |
||||||
|
} |
||||||
|
grouped.reverse |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Build a map of key_hash => offset for the keys in the dirty portion of the log to use in cleaning. |
||||||
|
* @param log The log to use |
||||||
|
* @param start The offset at which dirty messages begin |
||||||
|
* @param end The ending offset for the map that is being built |
||||||
|
* @param map The map in which to store the mappings |
||||||
|
* |
||||||
|
* @return The final offset the map covers |
||||||
|
*/ |
||||||
|
private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = { |
||||||
|
map.clear() |
||||||
|
val segments = log.logSegments(start, end) |
||||||
|
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, segments.size, start, end)) |
||||||
|
var offset = segments.head.baseOffset |
||||||
|
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) |
||||||
|
for (segment <- segments) { |
||||||
|
checkDone() |
||||||
|
offset = buildOffsetMap(segment, map) |
||||||
|
} |
||||||
|
info("Offset map for log %s complete.".format(log.name)) |
||||||
|
offset |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Add the messages in the given segment to the offset map |
||||||
|
* |
||||||
|
* @param segment The segment to index |
||||||
|
* @param map The map in which to store the key=>offset mapping |
||||||
|
* |
||||||
|
* @return The final offset covered by the map |
||||||
|
*/ |
||||||
|
private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = { |
||||||
|
var position = 0 |
||||||
|
var offset = segment.baseOffset |
||||||
|
while (position < segment.log.sizeInBytes) { |
||||||
|
checkDone() |
||||||
|
readBuffer.clear() |
||||||
|
val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position)) |
||||||
|
throttler.maybeThrottle(messages.sizeInBytes) |
||||||
|
val startPosition = position |
||||||
|
for (entry <- messages) { |
||||||
|
val message = entry.message |
||||||
|
require(message.hasKey) |
||||||
|
val size = MessageSet.entrySize(message) |
||||||
|
position += size |
||||||
|
map.put(message.key, entry.offset) |
||||||
|
offset = entry.offset |
||||||
|
stats.indexMessage(size) |
||||||
|
} |
||||||
|
// if we didn't read even one complete message, our read buffer may be too small |
||||||
|
if(position == startPosition) |
||||||
|
growBuffers() |
||||||
|
} |
||||||
|
restoreBuffers() |
||||||
|
offset |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* If we aren't running any more throw an AllDoneException |
||||||
|
*/ |
||||||
|
private def checkDone() { |
||||||
|
if (Thread.currentThread.isInterrupted) |
||||||
|
throw new InterruptedException |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* A simple struct for collecting stats about log cleaning |
||||||
|
*/ |
||||||
|
private case class CleanerStats(time: Time = SystemTime) { |
||||||
|
var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L |
||||||
|
clear() |
||||||
|
|
||||||
|
def readMessage(size: Int) { |
||||||
|
messagesRead += 1 |
||||||
|
bytesRead += size |
||||||
|
} |
||||||
|
|
||||||
|
def recopyMessage(size: Int) { |
||||||
|
messagesWritten += 1 |
||||||
|
bytesWritten += size |
||||||
|
} |
||||||
|
|
||||||
|
def indexMessage(size: Int) { |
||||||
|
mapMessagesRead += 1 |
||||||
|
mapBytesRead += size |
||||||
|
} |
||||||
|
|
||||||
|
def indexDone() { |
||||||
|
mapCompleteTime = time.milliseconds |
||||||
|
} |
||||||
|
|
||||||
|
def allDone() { |
||||||
|
endTime = time.milliseconds |
||||||
|
} |
||||||
|
|
||||||
|
def elapsedSecs = (endTime - startTime)/1000.0 |
||||||
|
|
||||||
|
def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0 |
||||||
|
|
||||||
|
def clear() { |
||||||
|
startTime = time.milliseconds |
||||||
|
mapCompleteTime = -1L |
||||||
|
endTime = -1L |
||||||
|
bytesRead = 0L |
||||||
|
bytesWritten = 0L |
||||||
|
mapBytesRead = 0L |
||||||
|
mapMessagesRead = 0L |
||||||
|
messagesRead = 0L |
||||||
|
messagesWritten = 0L |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Helper class for a log, its topic/partition, and the last clean position |
||||||
|
*/ |
||||||
|
private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] { |
||||||
|
val cleanBytes = log.logSegments(-1, firstDirtyOffset-1).map(_.size).sum |
||||||
|
val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum |
||||||
|
val cleanableRatio = dirtyBytes / totalBytes.toDouble |
||||||
|
def totalBytes = cleanBytes + dirtyBytes |
||||||
|
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt |
||||||
|
} |
@ -0,0 +1,51 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
import java.io.File |
||||||
|
import scala.collection._ |
||||||
|
import kafka.common._ |
||||||
|
|
||||||
|
/** |
||||||
|
* Configuration settings for a log |
||||||
|
* @param segmentSize The soft maximum for the size of a segment file in the log |
||||||
|
* @param segmentMs The soft maximum on the amount of time before a new log segment is rolled |
||||||
|
* @param flushInterval The number of messages that can be written to the log before a flush is forced |
||||||
|
* @param flushMs The amount of time the log can have dirty data before a flush is forced |
||||||
|
* @param retentionSize The approximate total number of bytes this log can use |
||||||
|
* @param retentionMs The age approximate maximum age of the last segment that is retained |
||||||
|
* @param maxIndexSize The maximum size of an index file |
||||||
|
* @param indexInterval The approximate number of bytes between index entries |
||||||
|
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem |
||||||
|
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned |
||||||
|
* @param dedupe Should old segments in this log be deleted or deduplicated? |
||||||
|
*/ |
||||||
|
case class LogConfig(val segmentSize: Int = 1024*1024, |
||||||
|
val segmentMs: Long = Long.MaxValue, |
||||||
|
val flushInterval: Long = Long.MaxValue, |
||||||
|
val flushMs: Long = Long.MaxValue, |
||||||
|
val retentionSize: Long = Long.MaxValue, |
||||||
|
val retentionMs: Long = Long.MaxValue, |
||||||
|
val maxMessageSize: Int = Int.MaxValue, |
||||||
|
val maxIndexSize: Int = 1024*1024, |
||||||
|
val indexInterval: Int = 4096, |
||||||
|
val fileDeleteDelayMs: Long = 60*1000, |
||||||
|
val minCleanableRatio: Double = 0.5, |
||||||
|
val dedupe: Boolean = false) |
||||||
|
|
||||||
|
|
@ -0,0 +1,136 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
import java.util.Arrays |
||||||
|
import java.security.MessageDigest |
||||||
|
import java.nio.ByteBuffer |
||||||
|
import kafka.utils._ |
||||||
|
|
||||||
|
trait OffsetMap { |
||||||
|
def capacity: Int |
||||||
|
def put(key: ByteBuffer, offset: Long) |
||||||
|
def get(key: ByteBuffer): Long |
||||||
|
def clear() |
||||||
|
def size: Int |
||||||
|
def utilization: Double = size.toDouble / capacity |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* An approximate map used for deduplicating the log. |
||||||
|
* @param memory The amount of memory this map can use |
||||||
|
* @param maxLoadFactor The maximum percent full this offset map can be |
||||||
|
* @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512 |
||||||
|
*/ |
||||||
|
@nonthreadsafe |
||||||
|
class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgorithm: String = "MD5") extends OffsetMap { |
||||||
|
private val bytes = ByteBuffer.allocate(memory) |
||||||
|
|
||||||
|
/* the hash algorithm instance to use, defualt is MD5 */ |
||||||
|
private val digest = MessageDigest.getInstance(hashAlgorithm) |
||||||
|
|
||||||
|
/* the number of bytes for this hash algorithm */ |
||||||
|
private val hashSize = digest.getDigestLength |
||||||
|
|
||||||
|
/* create some hash buffers to avoid reallocating each time */ |
||||||
|
private val hash1 = new Array[Byte](hashSize) |
||||||
|
private val hash2 = new Array[Byte](hashSize) |
||||||
|
|
||||||
|
/* number of entries put into the map */ |
||||||
|
private var entries = 0 |
||||||
|
|
||||||
|
/* a byte added as a prefix to all keys to make collisions non-static in repeated uses. Changed in clear(). */ |
||||||
|
private var salt: Byte = 0 |
||||||
|
|
||||||
|
/** |
||||||
|
* The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset) |
||||||
|
*/ |
||||||
|
val bytesPerEntry = hashSize + 8 |
||||||
|
|
||||||
|
/** |
||||||
|
* The maximum number of entries this map can contain before it exceeds the max load factor |
||||||
|
*/ |
||||||
|
override val capacity: Int = (maxLoadFactor * memory / bytesPerEntry).toInt |
||||||
|
|
||||||
|
/** |
||||||
|
* Associate a offset with a key. |
||||||
|
* @param key The key |
||||||
|
* @param offset The offset |
||||||
|
*/ |
||||||
|
override def put(key: ByteBuffer, offset: Long) { |
||||||
|
if(size + 1 > capacity) |
||||||
|
throw new IllegalStateException("Attempt to add to a full offset map with a maximum capacity of %d.".format(capacity)) |
||||||
|
hash(key, hash1) |
||||||
|
bytes.position(offsetFor(hash1)) |
||||||
|
bytes.put(hash1) |
||||||
|
bytes.putLong(offset) |
||||||
|
entries += 1 |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Get the offset associated with this key. This method is approximate, |
||||||
|
* it may not find an offset previously stored, but cannot give a wrong offset. |
||||||
|
* @param key The key |
||||||
|
* @return The offset associated with this key or -1 if the key is not found |
||||||
|
*/ |
||||||
|
override def get(key: ByteBuffer): Long = { |
||||||
|
hash(key, hash1) |
||||||
|
bytes.position(offsetFor(hash1)) |
||||||
|
bytes.get(hash2) |
||||||
|
// if the computed hash equals the stored hash return the stored offset |
||||||
|
if(Arrays.equals(hash1, hash2)) |
||||||
|
bytes.getLong() |
||||||
|
else |
||||||
|
-1L |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Change the salt used for key hashing making all existing keys unfindable. |
||||||
|
* Doesn't actually zero out the array. |
||||||
|
*/ |
||||||
|
override def clear() { |
||||||
|
this.entries = 0 |
||||||
|
this.salt = (this.salt + 1).toByte |
||||||
|
Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* The number of entries put into the map (note that not all may remain) |
||||||
|
*/ |
||||||
|
override def size: Int = entries |
||||||
|
|
||||||
|
/** |
||||||
|
* Choose a slot in the array for this hash |
||||||
|
*/ |
||||||
|
private def offsetFor(hash: Array[Byte]): Int = |
||||||
|
bytesPerEntry * (Utils.abs(Utils.readInt(hash, 0)) % capacity) |
||||||
|
|
||||||
|
/** |
||||||
|
* The offset at which we have stored the given key |
||||||
|
* @param key The key to hash |
||||||
|
* @param buffer The buffer to store the hash into |
||||||
|
*/ |
||||||
|
private def hash(key: ByteBuffer, buffer: Array[Byte]) { |
||||||
|
key.mark() |
||||||
|
digest.update(salt) |
||||||
|
digest.update(key) |
||||||
|
key.reset() |
||||||
|
digest.digest(buffer, 0, hashSize) |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -1,118 +0,0 @@ |
|||||||
/** |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
package kafka.server |
|
||||||
|
|
||||||
import kafka.utils.Logging |
|
||||||
import kafka.common._ |
|
||||||
import java.util.concurrent.locks.ReentrantLock |
|
||||||
import java.io._ |
|
||||||
|
|
||||||
/** |
|
||||||
* This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for |
|
||||||
* all topics and partitions that this broker hosts. The format of this file is as follows - |
|
||||||
* version |
|
||||||
* number of entries |
|
||||||
* topic partition highwatermark |
|
||||||
*/ |
|
||||||
|
|
||||||
object HighwaterMarkCheckpoint { |
|
||||||
val highWatermarkFileName = "replication-offset-checkpoint" |
|
||||||
val currentHighwaterMarkFileVersion = 0 |
|
||||||
} |
|
||||||
|
|
||||||
class HighwaterMarkCheckpoint(val path: String) extends Logging { |
|
||||||
/* create the highwatermark file handle for all partitions */ |
|
||||||
val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName |
|
||||||
private val hwFile = new File(name) |
|
||||||
private val hwFileLock = new ReentrantLock() |
|
||||||
// recover from previous tmp file, if required |
|
||||||
|
|
||||||
def write(highwaterMarksPerPartition: Map[TopicAndPartition, Long]) { |
|
||||||
hwFileLock.lock() |
|
||||||
try { |
|
||||||
// write to temp file and then swap with the highwatermark file |
|
||||||
val tempHwFile = new File(hwFile + ".tmp") |
|
||||||
|
|
||||||
val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile)) |
|
||||||
// checkpoint highwatermark for all partitions |
|
||||||
// write the current version |
|
||||||
hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString) |
|
||||||
hwFileWriter.newLine() |
|
||||||
// write the number of entries in the highwatermark file |
|
||||||
hwFileWriter.write(highwaterMarksPerPartition.size.toString) |
|
||||||
hwFileWriter.newLine() |
|
||||||
|
|
||||||
highwaterMarksPerPartition.foreach { partitionAndHw => |
|
||||||
hwFileWriter.write("%s %s %s".format(partitionAndHw._1.topic, partitionAndHw._1.partition, partitionAndHw._2)) |
|
||||||
hwFileWriter.newLine() |
|
||||||
} |
|
||||||
hwFileWriter.flush() |
|
||||||
hwFileWriter.close() |
|
||||||
// swap new high watermark file with previous one |
|
||||||
if(!tempHwFile.renameTo(hwFile)) { |
|
||||||
fatal("Attempt to swap the new high watermark file with the old one failed") |
|
||||||
System.exit(1) |
|
||||||
} |
|
||||||
}finally { |
|
||||||
hwFileLock.unlock() |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
def read(topic: String, partition: Int): Long = { |
|
||||||
hwFileLock.lock() |
|
||||||
try { |
|
||||||
hwFile.length() match { |
|
||||||
case 0 => |
|
||||||
warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d." |
|
||||||
.format(topic, partition)) |
|
||||||
0L |
|
||||||
case _ => |
|
||||||
val hwFileReader = new BufferedReader(new FileReader(hwFile)) |
|
||||||
val version = hwFileReader.readLine().toShort |
|
||||||
version match { |
|
||||||
case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion => |
|
||||||
val numberOfHighWatermarks = hwFileReader.readLine().toInt |
|
||||||
val partitionHighWatermarks = |
|
||||||
for(i <- 0 until numberOfHighWatermarks) yield { |
|
||||||
val nextHwEntry = hwFileReader.readLine() |
|
||||||
val partitionHwInfo = nextHwEntry.split(" ") |
|
||||||
val topic = partitionHwInfo(0) |
|
||||||
val partitionId = partitionHwInfo(1).toInt |
|
||||||
val highWatermark = partitionHwInfo(2).toLong |
|
||||||
(TopicAndPartition(topic, partitionId) -> highWatermark) |
|
||||||
} |
|
||||||
hwFileReader.close() |
|
||||||
val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition)) |
|
||||||
hwOpt match { |
|
||||||
case Some(hw) => |
|
||||||
debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition)) |
|
||||||
hw |
|
||||||
case None => |
|
||||||
warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + |
|
||||||
"partition %d. Returning 0 as the highwatermark".format(partition)) |
|
||||||
0L |
|
||||||
} |
|
||||||
case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version) |
|
||||||
System.exit(1) |
|
||||||
-1L |
|
||||||
} |
|
||||||
} |
|
||||||
}finally { |
|
||||||
hwFileLock.unlock() |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
@ -0,0 +1,103 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package kafka.server |
||||||
|
|
||||||
|
import scala.collection._ |
||||||
|
import kafka.utils.Logging |
||||||
|
import kafka.common._ |
||||||
|
import java.util.concurrent.locks.ReentrantLock |
||||||
|
import java.io._ |
||||||
|
|
||||||
|
/** |
||||||
|
* This class saves out a map of topic/partition=>offsets to a file |
||||||
|
*/ |
||||||
|
class OffsetCheckpoint(val file: File) extends Logging { |
||||||
|
private val lock = new Object() |
||||||
|
new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness |
||||||
|
file.createNewFile() // in case the file doesn't exist |
||||||
|
|
||||||
|
def write(offsets: Map[TopicAndPartition, Long]) { |
||||||
|
lock synchronized { |
||||||
|
// write to temp file and then swap with the existing file |
||||||
|
val temp = new File(file.getAbsolutePath + ".tmp") |
||||||
|
|
||||||
|
val writer = new BufferedWriter(new FileWriter(temp)) |
||||||
|
try { |
||||||
|
// write the current version |
||||||
|
writer.write(0.toString) |
||||||
|
writer.newLine() |
||||||
|
|
||||||
|
// write the number of entries |
||||||
|
writer.write(offsets.size.toString) |
||||||
|
writer.newLine() |
||||||
|
|
||||||
|
// write the entries |
||||||
|
offsets.foreach { case (topicPart, offset) => |
||||||
|
writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset)) |
||||||
|
writer.newLine() |
||||||
|
} |
||||||
|
|
||||||
|
// flush and overwrite old file |
||||||
|
writer.flush() |
||||||
|
if(!temp.renameTo(file)) |
||||||
|
throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) |
||||||
|
} finally { |
||||||
|
writer.close() |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
def read(): Map[TopicAndPartition, Long] = { |
||||||
|
lock synchronized { |
||||||
|
val reader = new BufferedReader(new FileReader(file)) |
||||||
|
try { |
||||||
|
var line = reader.readLine() |
||||||
|
if(line == null) |
||||||
|
return Map.empty |
||||||
|
val version = line.toInt |
||||||
|
version match { |
||||||
|
case 0 => |
||||||
|
line = reader.readLine() |
||||||
|
if(line == null) |
||||||
|
return Map.empty |
||||||
|
val expectedSize = line.toInt |
||||||
|
var offsets = Map[TopicAndPartition, Long]() |
||||||
|
line = reader.readLine() |
||||||
|
while(line != null) { |
||||||
|
val pieces = line.split("\\s+") |
||||||
|
if(pieces.length != 3) |
||||||
|
throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line)) |
||||||
|
|
||||||
|
val topic = pieces(0) |
||||||
|
val partition = pieces(1).toInt |
||||||
|
val offset = pieces(2).toLong |
||||||
|
offsets += (TopicAndPartition(pieces(0), partition) -> offset) |
||||||
|
line = reader.readLine() |
||||||
|
} |
||||||
|
if(offsets.size != expectedSize) |
||||||
|
throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size)) |
||||||
|
offsets |
||||||
|
case _ => |
||||||
|
throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version) |
||||||
|
} |
||||||
|
} finally { |
||||||
|
reader.close() |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,216 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka |
||||||
|
|
||||||
|
import joptsimple.OptionParser |
||||||
|
import java.util.Properties |
||||||
|
import java.util.Random |
||||||
|
import java.io._ |
||||||
|
import scala.io.Source |
||||||
|
import scala.io.BufferedSource |
||||||
|
import kafka.producer._ |
||||||
|
import kafka.consumer._ |
||||||
|
import kafka.serializer._ |
||||||
|
import kafka.utils._ |
||||||
|
|
||||||
|
/** |
||||||
|
* This is a torture test that runs against an existing broker. Here is how it works: |
||||||
|
* |
||||||
|
* It produces a series of specially formatted messages to one or more partitions. Each message it produces |
||||||
|
* it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space. |
||||||
|
* |
||||||
|
* The broker will clean its log as the test runs. |
||||||
|
* |
||||||
|
* When the specified number of messages have been produced we create a consumer and consume all the messages in the topic |
||||||
|
* and write that out to another text file. |
||||||
|
* |
||||||
|
* Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. |
||||||
|
* Then we compare the final message in both logs for each key. If this final message is not the same for all keys we |
||||||
|
* print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0. |
||||||
|
*/ |
||||||
|
object TestLogCleaning { |
||||||
|
|
||||||
|
def main(args: Array[String]) { |
||||||
|
val parser = new OptionParser |
||||||
|
val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.") |
||||||
|
.withRequiredArg |
||||||
|
.describedAs("count") |
||||||
|
.ofType(classOf[java.lang.Long]) |
||||||
|
.defaultsTo(Long.MaxValue) |
||||||
|
val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.") |
||||||
|
.withRequiredArg |
||||||
|
.describedAs("count") |
||||||
|
.ofType(classOf[java.lang.Integer]) |
||||||
|
.defaultsTo(5) |
||||||
|
val brokerOpt = parser.accepts("broker", "Url to connect to.") |
||||||
|
.withRequiredArg |
||||||
|
.describedAs("url") |
||||||
|
.ofType(classOf[String]) |
||||||
|
val topicsOpt = parser.accepts("topics", "The number of topics to test.") |
||||||
|
.withRequiredArg |
||||||
|
.describedAs("count") |
||||||
|
.ofType(classOf[java.lang.Integer]) |
||||||
|
.defaultsTo(1) |
||||||
|
val zkConnectOpt = parser.accepts("zk", "Zk url.") |
||||||
|
.withRequiredArg |
||||||
|
.describedAs("url") |
||||||
|
.ofType(classOf[String]) |
||||||
|
val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.") |
||||||
|
.withRequiredArg |
||||||
|
.describedAs("ms") |
||||||
|
.ofType(classOf[java.lang.Integer]) |
||||||
|
.defaultsTo(0) |
||||||
|
val cleanupOpt = parser.accepts("cleanup", "Delete temp files when done.") |
||||||
|
|
||||||
|
val options = parser.parse(args:_*) |
||||||
|
|
||||||
|
if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) { |
||||||
|
parser.printHelpOn(System.err) |
||||||
|
System.exit(1) |
||||||
|
} |
||||||
|
|
||||||
|
// parse options |
||||||
|
val messages = options.valueOf(numMessagesOpt).longValue |
||||||
|
val dups = options.valueOf(numDupsOpt).intValue |
||||||
|
val brokerUrl = options.valueOf(brokerOpt) |
||||||
|
val topicCount = options.valueOf(topicsOpt).intValue |
||||||
|
val zkUrl = options.valueOf(zkConnectOpt) |
||||||
|
val sleepSecs = options.valueOf(sleepSecsOpt).intValue |
||||||
|
val cleanup = options.has(cleanupOpt) |
||||||
|
|
||||||
|
val testId = new Random().nextInt(Int.MaxValue) |
||||||
|
val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray |
||||||
|
|
||||||
|
println("Producing %d messages...".format(messages)) |
||||||
|
val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, cleanup) |
||||||
|
println("Sleeping for %d seconds...".format(sleepSecs)) |
||||||
|
Thread.sleep(sleepSecs * 1000) |
||||||
|
println("Consuming messages...") |
||||||
|
val consumedDataFile = consumeMessages(zkUrl, topics, cleanup) |
||||||
|
|
||||||
|
val producedLines = lineCount(producedDataFile) |
||||||
|
val consumedLines = lineCount(consumedDataFile) |
||||||
|
val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble |
||||||
|
println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction)) |
||||||
|
|
||||||
|
println("Validating output files...") |
||||||
|
validateOutput(externalSort(producedDataFile), externalSort(consumedDataFile)) |
||||||
|
println("All done.") |
||||||
|
} |
||||||
|
|
||||||
|
def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size |
||||||
|
|
||||||
|
def validateOutput(produced: BufferedReader, consumed: BufferedReader) { |
||||||
|
while(true) { |
||||||
|
val prod = readFinalValue(produced) |
||||||
|
val cons = readFinalValue(consumed) |
||||||
|
if(prod == null && cons == null) { |
||||||
|
return |
||||||
|
} else if(prod != cons) { |
||||||
|
System.err.println("Validation failed prod = %s, cons = %s!".format(prod, cons)) |
||||||
|
System.exit(1) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
def readFinalValue(reader: BufferedReader): (String, Int, Int) = { |
||||||
|
def readTuple() = { |
||||||
|
val line = reader.readLine |
||||||
|
if(line == null) |
||||||
|
null |
||||||
|
else |
||||||
|
line.split("\t") |
||||||
|
} |
||||||
|
var prev = readTuple() |
||||||
|
if(prev == null) |
||||||
|
return null |
||||||
|
while(true) { |
||||||
|
reader.mark(1024) |
||||||
|
val curr = readTuple() |
||||||
|
if(curr == null || curr(0) != prev(0) || curr(1) != prev(1)) { |
||||||
|
reader.reset() |
||||||
|
return (prev(0), prev(1).toInt, prev(2).toInt) |
||||||
|
} else { |
||||||
|
prev = curr |
||||||
|
} |
||||||
|
} |
||||||
|
return null |
||||||
|
} |
||||||
|
|
||||||
|
def externalSort(file: File): BufferedReader = { |
||||||
|
val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", file.getAbsolutePath) |
||||||
|
val process = builder.start() |
||||||
|
new BufferedReader(new InputStreamReader(process.getInputStream())) |
||||||
|
} |
||||||
|
|
||||||
|
def produceMessages(brokerUrl: String, |
||||||
|
topics: Array[String], |
||||||
|
messages: Long, |
||||||
|
dups: Int, |
||||||
|
cleanup: Boolean): File = { |
||||||
|
val producerProps = new Properties |
||||||
|
producerProps.setProperty("producer.type", "async") |
||||||
|
producerProps.setProperty("broker.list", brokerUrl) |
||||||
|
producerProps.setProperty("serializer.class", classOf[StringEncoder].getName) |
||||||
|
producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName) |
||||||
|
producerProps.setProperty("queue.enqueue.timeout.ms", "-1") |
||||||
|
producerProps.setProperty("batch.size", 1000.toString) |
||||||
|
val producer = new Producer[String, String](new ProducerConfig(producerProps)) |
||||||
|
val rand = new Random(1) |
||||||
|
val keyCount = (messages / dups).toInt |
||||||
|
val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt") |
||||||
|
if(cleanup) |
||||||
|
producedFile.deleteOnExit() |
||||||
|
val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024) |
||||||
|
for(i <- 0L until (messages * topics.length)) { |
||||||
|
val topic = topics((i % topics.length).toInt) |
||||||
|
val key = rand.nextInt(keyCount) |
||||||
|
producer.send(KeyedMessage(topic = topic, key = key.toString, message = i.toString)) |
||||||
|
producedWriter.write("%s\t%s\t%s\n".format(topic, key, i)) |
||||||
|
} |
||||||
|
producedWriter.close() |
||||||
|
producer.close() |
||||||
|
producedFile |
||||||
|
} |
||||||
|
|
||||||
|
def consumeMessages(zkUrl: String, topics: Array[String], cleanup: Boolean): File = { |
||||||
|
val consumerProps = new Properties |
||||||
|
consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) |
||||||
|
consumerProps.setProperty("zk.connect", zkUrl) |
||||||
|
consumerProps.setProperty("consumer.timeout.ms", (5*1000).toString) |
||||||
|
val connector = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) |
||||||
|
val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder) |
||||||
|
val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt") |
||||||
|
if(cleanup) |
||||||
|
consumedFile.deleteOnExit() |
||||||
|
val consumedWriter = new BufferedWriter(new FileWriter(consumedFile)) |
||||||
|
for(topic <- topics) { |
||||||
|
val stream = streams(topic).head |
||||||
|
try { |
||||||
|
for(item <- stream) |
||||||
|
consumedWriter.write("%s\t%s\t%s\n".format(topic, item.key, item.message)) |
||||||
|
} catch { |
||||||
|
case e: ConsumerTimeoutException => |
||||||
|
} |
||||||
|
} |
||||||
|
consumedWriter.close() |
||||||
|
connector.shutdown() |
||||||
|
consumedFile |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,227 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
import junit.framework.Assert._ |
||||||
|
import org.scalatest.junit.JUnitSuite |
||||||
|
import org.junit.{After, Before, Test} |
||||||
|
import java.nio._ |
||||||
|
import java.io.File |
||||||
|
import scala.collection._ |
||||||
|
import kafka.common._ |
||||||
|
import kafka.utils._ |
||||||
|
import kafka.message._ |
||||||
|
|
||||||
|
/** |
||||||
|
* Unit tests for the log cleaning logic |
||||||
|
*/ |
||||||
|
class CleanerTest extends JUnitSuite { |
||||||
|
|
||||||
|
val dir = TestUtils.tempDir() |
||||||
|
val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true) |
||||||
|
val time = new MockTime() |
||||||
|
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) |
||||||
|
|
||||||
|
@After |
||||||
|
def teardown() { |
||||||
|
Utils.rm(dir) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Test simple log cleaning |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
def testCleanSegments() { |
||||||
|
val cleaner = makeCleaner(Int.MaxValue) |
||||||
|
val log = makeLog(config = logConfig.copy(segmentSize = 1024)) |
||||||
|
|
||||||
|
// append messages to the log until we have four segments |
||||||
|
while(log.numberOfSegments < 4) |
||||||
|
log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt)) |
||||||
|
val keysFound = keysInLog(log) |
||||||
|
assertEquals((0L until log.logEndOffset), keysFound) |
||||||
|
|
||||||
|
// pretend we have the following keys |
||||||
|
val keys = immutable.ListSet(1, 3, 5, 7, 9) |
||||||
|
val map = new FakeOffsetMap(Int.MaxValue) |
||||||
|
keys.foreach(k => map.put(key(k), Long.MaxValue)) |
||||||
|
|
||||||
|
// clean the log |
||||||
|
cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0) |
||||||
|
val shouldRemain = keysInLog(log).filter(!keys.contains(_)) |
||||||
|
assertEquals(shouldRemain, keysInLog(log)) |
||||||
|
} |
||||||
|
|
||||||
|
/* extract all the keys from a log */ |
||||||
|
def keysInLog(log: Log): Iterable[Int] = |
||||||
|
log.logSegments.flatMap(s => s.log.map(m => Utils.readString(m.message.key).toInt)) |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* Test that a truncation during cleaning throws an OptimisticLockFailureException |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
def testCleanSegmentsWithTruncation() { |
||||||
|
val cleaner = makeCleaner(Int.MaxValue) |
||||||
|
val log = makeLog(config = logConfig.copy(segmentSize = 1024)) |
||||||
|
|
||||||
|
// append messages to the log until we have four segments |
||||||
|
while(log.numberOfSegments < 2) |
||||||
|
log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt)) |
||||||
|
|
||||||
|
log.truncateTo(log.logEndOffset-2) |
||||||
|
val keys = keysInLog(log) |
||||||
|
val map = new FakeOffsetMap(Int.MaxValue) |
||||||
|
keys.foreach(k => map.put(key(k), Long.MaxValue)) |
||||||
|
intercept[OptimisticLockFailureException] { |
||||||
|
cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Validate the logic for grouping log segments together for cleaning |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
def testSegmentGrouping() { |
||||||
|
val cleaner = makeCleaner(Int.MaxValue) |
||||||
|
val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1)) |
||||||
|
|
||||||
|
// append some messages to the log |
||||||
|
var i = 0 |
||||||
|
while(log.numberOfSegments < 10) { |
||||||
|
log.append(TestUtils.singleMessageSet("hello".getBytes)) |
||||||
|
i += 1 |
||||||
|
} |
||||||
|
|
||||||
|
// grouping by very large values should result in a single group with all the segments in it |
||||||
|
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) |
||||||
|
assertEquals(1, groups.size) |
||||||
|
assertEquals(log.numberOfSegments, groups(0).size) |
||||||
|
checkSegmentOrder(groups) |
||||||
|
|
||||||
|
// grouping by very small values should result in all groups having one entry |
||||||
|
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue) |
||||||
|
assertEquals(log.numberOfSegments, groups.size) |
||||||
|
assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) |
||||||
|
checkSegmentOrder(groups) |
||||||
|
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1) |
||||||
|
assertEquals(log.numberOfSegments, groups.size) |
||||||
|
assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) |
||||||
|
checkSegmentOrder(groups) |
||||||
|
|
||||||
|
val groupSize = 3 |
||||||
|
|
||||||
|
// check grouping by log size |
||||||
|
val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1 |
||||||
|
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue) |
||||||
|
checkSegmentOrder(groups) |
||||||
|
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) |
||||||
|
|
||||||
|
// check grouping by index size |
||||||
|
val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes()).sum + 1 |
||||||
|
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize) |
||||||
|
checkSegmentOrder(groups) |
||||||
|
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) |
||||||
|
} |
||||||
|
|
||||||
|
private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) { |
||||||
|
val offsets = groups.flatMap(_.map(_.baseOffset)) |
||||||
|
assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Test building an offset map off the log |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
def testBuildOffsetMap() { |
||||||
|
val map = new FakeOffsetMap(1000) |
||||||
|
val log = makeLog() |
||||||
|
val cleaner = makeCleaner(Int.MaxValue) |
||||||
|
val start = 0 |
||||||
|
val end = 500 |
||||||
|
val offsets = writeToLog(log, (start until end) zip (start until end)) |
||||||
|
def checkRange(map: FakeOffsetMap, start: Int, end: Int) { |
||||||
|
val endOffset = cleaner.buildOffsetMap(log, start, end, map) + 1 |
||||||
|
assertEquals("Last offset should be the end offset.", end, endOffset) |
||||||
|
assertEquals("Should have the expected number of messages in the map.", end-start, map.size) |
||||||
|
for(i <- start until end) |
||||||
|
assertEquals("Should find all the keys", i.toLong, map.get(key(i))) |
||||||
|
assertEquals("Should not find a value too small", -1L, map.get(key(start-1))) |
||||||
|
assertEquals("Should not find a value too large", -1L, map.get(key(end))) |
||||||
|
} |
||||||
|
val segments = log.logSegments.toSeq |
||||||
|
checkRange(map, 0, segments(1).baseOffset.toInt) |
||||||
|
checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt) |
||||||
|
checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt) |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Test that we don't exceed the maximum capacity of the offset map, that is that an offset map |
||||||
|
* with a max size of 1000 will only clean 1000 new entries even if more than that are available. |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
def testBuildOffsetMapOverCapacity() { |
||||||
|
val map = new FakeOffsetMap(1000) |
||||||
|
val log = makeLog() |
||||||
|
val cleaner = makeCleaner(Int.MaxValue) |
||||||
|
val vals = 0 until 1001 |
||||||
|
val offsets = writeToLog(log, vals zip vals) |
||||||
|
val lastOffset = cleaner.buildOffsetMap(log, vals.start, vals.end, map) |
||||||
|
assertEquals("Shouldn't go beyond the capacity of the offset map.", 1000, lastOffset) |
||||||
|
} |
||||||
|
|
||||||
|
def makeLog(dir: File = dir, config: LogConfig = logConfig) = |
||||||
|
new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time) |
||||||
|
|
||||||
|
def makeCleaner(capacity: Int) = |
||||||
|
new Cleaner(id = 0, new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, throttler = throttler, time = time) |
||||||
|
|
||||||
|
def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { |
||||||
|
for((key, value) <- seq) |
||||||
|
yield log.append(messages(key, value)).firstOffset |
||||||
|
} |
||||||
|
|
||||||
|
def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) |
||||||
|
|
||||||
|
def messages(key: Int, value: Int) = |
||||||
|
new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes)) |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
class FakeOffsetMap(val capacity: Int) extends OffsetMap { |
||||||
|
val map = new java.util.HashMap[String, Long]() |
||||||
|
|
||||||
|
private def keyFor(key: ByteBuffer) = |
||||||
|
new String(Utils.readBytes(key.duplicate), "UTF-8") |
||||||
|
|
||||||
|
def put(key: ByteBuffer, offset: Long): Unit = |
||||||
|
map.put(keyFor(key), offset) |
||||||
|
|
||||||
|
def get(key: ByteBuffer): Long = { |
||||||
|
val k = keyFor(key) |
||||||
|
if(map.containsKey(k)) |
||||||
|
map.get(k) |
||||||
|
else |
||||||
|
-1L |
||||||
|
} |
||||||
|
|
||||||
|
def clear() = map.clear() |
||||||
|
|
||||||
|
def size: Int = map.size |
||||||
|
|
||||||
|
} |
@ -0,0 +1,117 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
import java.io.File |
||||||
|
import scala.collection._ |
||||||
|
import org.junit._ |
||||||
|
import kafka.common.TopicAndPartition |
||||||
|
import kafka.utils._ |
||||||
|
import kafka.message._ |
||||||
|
import org.scalatest.junit.JUnitSuite |
||||||
|
import junit.framework.Assert._ |
||||||
|
|
||||||
|
/** |
||||||
|
* This is an integration test that tests the fully integrated log cleaner |
||||||
|
*/ |
||||||
|
class LogCleanerIntegrationTest extends JUnitSuite { |
||||||
|
|
||||||
|
val time = new MockTime() |
||||||
|
val segmentSize = 100 |
||||||
|
val deleteDelay = 1000 |
||||||
|
val logName = "log" |
||||||
|
val logDir = TestUtils.tempDir() |
||||||
|
var counter = 0 |
||||||
|
val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2)) |
||||||
|
|
||||||
|
@Test |
||||||
|
def cleanerTest() { |
||||||
|
val cleaner = makeCleaner(parts = 3) |
||||||
|
val log = cleaner.logs.get(topics(0)) |
||||||
|
|
||||||
|
val appends = writeDups(numKeys = 100, numDups = 3, log) |
||||||
|
val startSize = log.size |
||||||
|
cleaner.startup() |
||||||
|
|
||||||
|
val lastCleaned = log.activeSegment.baseOffset |
||||||
|
// wait until we clean up to base_offset of active segment - minDirtyMessages |
||||||
|
cleaner.awaitCleaned("log", 0, lastCleaned) |
||||||
|
|
||||||
|
val read = readFromLog(log) |
||||||
|
assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap) |
||||||
|
assertTrue(startSize > log.size) |
||||||
|
|
||||||
|
// write some more stuff and validate again |
||||||
|
val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log) |
||||||
|
val lastCleaned2 = log.activeSegment.baseOffset |
||||||
|
cleaner.awaitCleaned("log", 0, lastCleaned2) |
||||||
|
val read2 = readFromLog(log) |
||||||
|
assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) |
||||||
|
|
||||||
|
cleaner.shutdown() |
||||||
|
} |
||||||
|
|
||||||
|
def readFromLog(log: Log): Iterable[(Int, Int)] = { |
||||||
|
for(segment <- log.logSegments; message <- segment.log) yield { |
||||||
|
val key = Utils.readString(message.message.key).toInt |
||||||
|
val value = Utils.readString(message.message.payload).toInt |
||||||
|
key -> value |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { |
||||||
|
for(dup <- 0 until numDups; key <- 0 until numKeys) yield { |
||||||
|
val count = counter |
||||||
|
val appendInfo = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) |
||||||
|
counter += 1 |
||||||
|
(key, count) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@After |
||||||
|
def teardown() { |
||||||
|
Utils.rm(logDir) |
||||||
|
} |
||||||
|
|
||||||
|
/* create a cleaner instance and logs with the given parameters */ |
||||||
|
def makeCleaner(parts: Int, |
||||||
|
minDirtyMessages: Int = 0, |
||||||
|
numThreads: Int = 1, |
||||||
|
defaultPolicy: String = "dedupe", |
||||||
|
policyOverrides: Map[String, String] = Map()): LogCleaner = { |
||||||
|
|
||||||
|
// create partitions and add them to the pool |
||||||
|
val logs = new Pool[TopicAndPartition, Log]() |
||||||
|
for(i <- 0 until parts) { |
||||||
|
val dir = new File(logDir, "log-" + i) |
||||||
|
dir.mkdirs() |
||||||
|
val log = new Log(dir = dir, |
||||||
|
LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), |
||||||
|
needsRecovery = false, |
||||||
|
scheduler = time.scheduler, |
||||||
|
time = time) |
||||||
|
logs.put(TopicAndPartition("log", i), log) |
||||||
|
} |
||||||
|
|
||||||
|
new LogCleaner(CleanerConfig(numThreads = numThreads), |
||||||
|
logDirs = Array(logDir), |
||||||
|
logs = logs, |
||||||
|
time = time) |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,87 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.log |
||||||
|
|
||||||
|
import java.nio._ |
||||||
|
import org.junit._ |
||||||
|
import org.scalatest.junit.JUnitSuite |
||||||
|
import junit.framework.Assert._ |
||||||
|
|
||||||
|
class OffsetMapTest extends JUnitSuite { |
||||||
|
|
||||||
|
@Test |
||||||
|
def testBasicValidation() { |
||||||
|
validateMap(10) |
||||||
|
validateMap(100) |
||||||
|
validateMap(1000) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testClear() { |
||||||
|
val map = new SkimpyOffsetMap(4000, 0.75) |
||||||
|
for(i <- 0 until 10) |
||||||
|
map.put(key(i), i) |
||||||
|
for(i <- 0 until 10) |
||||||
|
assertEquals(i.toLong, map.get(key(i))) |
||||||
|
map.clear() |
||||||
|
for(i <- 0 until 10) |
||||||
|
assertEquals(map.get(key(i)), -1L) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testCapacity() { |
||||||
|
val map = new SkimpyOffsetMap(1024, 0.75) |
||||||
|
var i = 0 |
||||||
|
while(map.size < map.capacity) { |
||||||
|
map.put(key(i), i) |
||||||
|
i += 1 |
||||||
|
} |
||||||
|
// now the map is full, it should throw an exception |
||||||
|
intercept[IllegalStateException] { |
||||||
|
map.put(key(i), i) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes) |
||||||
|
|
||||||
|
def validateMap(items: Int) { |
||||||
|
val map = new SkimpyOffsetMap(items * 2 * 24, 0.75) |
||||||
|
for(i <- 0 until items) |
||||||
|
map.put(key(i), i) |
||||||
|
var misses = 0 |
||||||
|
for(i <- 0 until items) { |
||||||
|
map.get(key(i)) match { |
||||||
|
case -1L => misses += 1 |
||||||
|
case offset => assertEquals(i.toLong, offset) |
||||||
|
} |
||||||
|
} |
||||||
|
println("Miss rate: " + (misses.toDouble / items)) |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
object OffsetMapTest { |
||||||
|
def main(args: Array[String]) { |
||||||
|
if(args.length != 1) { |
||||||
|
System.err.println("USAGE: java OffsetMapTest size") |
||||||
|
System.exit(1) |
||||||
|
} |
||||||
|
val test = new OffsetMapTest() |
||||||
|
test.validateMap(args(0).toInt) |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue