Browse Source
A few notes: * Delete a few methods from `UnifiedLog` that were simply invoking the related method in `LogFileUtils` * Fix `CoreUtils.swallow` to use the passed in `logging` * Fix `LogCleanerParameterizedIntegrationTest` to close `log` before reopening * Minor tweaks in `LogSegment` for readability For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>pull/14101/merge
Ismael Juma
1 year ago
committed by
GitHub
45 changed files with 1857 additions and 1494 deletions
@ -1,693 +0,0 @@
@@ -1,693 +0,0 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package kafka.log |
||||
|
||||
import com.yammer.metrics.core.Timer |
||||
import kafka.common.LogSegmentOffsetOverflowException |
||||
import kafka.utils._ |
||||
import org.apache.kafka.common.InvalidRecordException |
||||
import org.apache.kafka.common.errors.CorruptRecordException |
||||
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} |
||||
import org.apache.kafka.common.record._ |
||||
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} |
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup |
||||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache |
||||
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CompletedTxn, FetchDataInfo, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, ProducerStateManager, RollParams, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} |
||||
|
||||
import java.io.{File, IOException} |
||||
import java.nio.file.attribute.FileTime |
||||
import java.nio.file.{Files, NoSuchFileException} |
||||
import java.util.Optional |
||||
import java.util.concurrent.TimeUnit |
||||
import scala.compat.java8.OptionConverters._ |
||||
import scala.jdk.CollectionConverters._ |
||||
import scala.math._ |
||||
|
||||
/** |
||||
* A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing |
||||
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each |
||||
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in |
||||
* any previous segment. |
||||
* |
||||
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. |
||||
* |
||||
* @param log The file records containing log entries |
||||
* @param lazyOffsetIndex The offset index |
||||
* @param lazyTimeIndex The timestamp index |
||||
* @param txnIndex The transaction index |
||||
* @param baseOffset A lower bound on the offsets in this segment |
||||
* @param indexIntervalBytes The approximate number of bytes between entries in the index |
||||
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time |
||||
* @param time The time instance |
||||
*/ |
||||
@nonthreadsafe |
||||
class LogSegment private[log] (val log: FileRecords, |
||||
val lazyOffsetIndex: LazyIndex[OffsetIndex], |
||||
val lazyTimeIndex: LazyIndex[TimeIndex], |
||||
val txnIndex: TransactionIndex, |
||||
val baseOffset: Long, |
||||
val indexIntervalBytes: Int, |
||||
val rollJitterMs: Long, |
||||
val time: Time) extends Logging { |
||||
|
||||
def offsetIndex: OffsetIndex = lazyOffsetIndex.get |
||||
|
||||
def timeIndex: TimeIndex = lazyTimeIndex.get |
||||
|
||||
def shouldRoll(rollParams: RollParams): Boolean = { |
||||
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs |
||||
size > rollParams.maxSegmentBytes - rollParams.messagesSize || |
||||
(size > 0 && reachedRollMs) || |
||||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) |
||||
} |
||||
|
||||
def resizeIndexes(size: Int): Unit = { |
||||
offsetIndex.resize(size) |
||||
timeIndex.resize(size) |
||||
} |
||||
|
||||
def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = { |
||||
if (lazyOffsetIndex.file.exists) { |
||||
// Resize the time index file to 0 if it is newly created. |
||||
if (timeIndexFileNewlyCreated) |
||||
timeIndex.resize(0) |
||||
// Sanity checks for time index and offset index are skipped because |
||||
// we will recover the segments above the recovery point in recoverLog() |
||||
// in any case so sanity checking them here is redundant. |
||||
txnIndex.sanityCheck() |
||||
} |
||||
else throw new NoSuchFileException(s"Offset index file ${lazyOffsetIndex.file.getAbsolutePath} does not exist") |
||||
} |
||||
|
||||
private var created = time.milliseconds |
||||
|
||||
/* the number of bytes since we last added an entry in the offset index */ |
||||
private var bytesSinceLastIndexEntry = 0 |
||||
|
||||
// The timestamp we used for time based log rolling and for ensuring max compaction delay |
||||
// volatile for LogCleaner to see the update |
||||
@volatile private var rollingBasedTimestamp: Option[Long] = None |
||||
|
||||
/* The maximum timestamp and offset we see so far */ |
||||
@volatile private var _maxTimestampAndOffsetSoFar: TimestampOffset = TimestampOffset.UNKNOWN |
||||
def maxTimestampAndOffsetSoFar_= (timestampOffset: TimestampOffset): Unit = _maxTimestampAndOffsetSoFar = timestampOffset |
||||
def maxTimestampAndOffsetSoFar: TimestampOffset = { |
||||
if (_maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) |
||||
_maxTimestampAndOffsetSoFar = timeIndex.lastEntry |
||||
_maxTimestampAndOffsetSoFar |
||||
} |
||||
|
||||
/* The maximum timestamp we see so far */ |
||||
def maxTimestampSoFar: Long = { |
||||
maxTimestampAndOffsetSoFar.timestamp |
||||
} |
||||
|
||||
def offsetOfMaxTimestampSoFar: Long = { |
||||
maxTimestampAndOffsetSoFar.offset |
||||
} |
||||
|
||||
/* Return the size in bytes of this log segment */ |
||||
def size: Int = log.sizeInBytes() |
||||
|
||||
/** |
||||
* checks that the argument offset can be represented as an integer offset relative to the baseOffset. |
||||
*/ |
||||
def canConvertToRelativeOffset(offset: Long): Boolean = { |
||||
offsetIndex.canAppendOffset(offset) |
||||
} |
||||
|
||||
/** |
||||
* Append the given messages starting with the given offset. Add |
||||
* an entry to the index if needed. |
||||
* |
||||
* It is assumed this method is being called from within a lock. |
||||
* |
||||
* @param largestOffset The last offset in the message set |
||||
* @param largestTimestamp The largest timestamp in the message set. |
||||
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. |
||||
* @param records The log entries to append. |
||||
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow |
||||
*/ |
||||
@nonthreadsafe |
||||
def append(largestOffset: Long, |
||||
largestTimestamp: Long, |
||||
shallowOffsetOfMaxTimestamp: Long, |
||||
records: MemoryRecords): Unit = { |
||||
if (records.sizeInBytes > 0) { |
||||
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " + |
||||
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp") |
||||
val physicalPosition = log.sizeInBytes() |
||||
if (physicalPosition == 0) |
||||
rollingBasedTimestamp = Some(largestTimestamp) |
||||
|
||||
ensureOffsetInRange(largestOffset) |
||||
|
||||
// append the messages |
||||
val appendedBytes = log.append(records) |
||||
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset") |
||||
// Update the in memory max timestamp and corresponding offset. |
||||
if (largestTimestamp > maxTimestampSoFar) { |
||||
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestamp, shallowOffsetOfMaxTimestamp) |
||||
} |
||||
// append an entry to the index (if needed) |
||||
if (bytesSinceLastIndexEntry > indexIntervalBytes) { |
||||
offsetIndex.append(largestOffset, physicalPosition) |
||||
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) |
||||
bytesSinceLastIndexEntry = 0 |
||||
} |
||||
bytesSinceLastIndexEntry += records.sizeInBytes |
||||
} |
||||
} |
||||
|
||||
private def ensureOffsetInRange(offset: Long): Unit = { |
||||
if (!canConvertToRelativeOffset(offset)) |
||||
throw new LogSegmentOffsetOverflowException(this, offset) |
||||
} |
||||
|
||||
private def appendChunkFromFile(records: FileRecords, position: Int, bufferSupplier: BufferSupplier): Int = { |
||||
var bytesToAppend = 0 |
||||
var maxTimestamp = Long.MinValue |
||||
var offsetOfMaxTimestamp = Long.MinValue |
||||
var maxOffset = Long.MinValue |
||||
var readBuffer = bufferSupplier.get(1024 * 1024) |
||||
|
||||
def canAppend(batch: RecordBatch) = |
||||
canConvertToRelativeOffset(batch.lastOffset) && |
||||
(bytesToAppend == 0 || bytesToAppend + batch.sizeInBytes < readBuffer.capacity) |
||||
|
||||
// find all batches that are valid to be appended to the current log segment and |
||||
// determine the maximum offset and timestamp |
||||
val nextBatches = records.batchesFrom(position).asScala.iterator |
||||
for (batch <- nextBatches.takeWhile(canAppend)) { |
||||
if (batch.maxTimestamp > maxTimestamp) { |
||||
maxTimestamp = batch.maxTimestamp |
||||
offsetOfMaxTimestamp = batch.lastOffset |
||||
} |
||||
maxOffset = batch.lastOffset |
||||
bytesToAppend += batch.sizeInBytes |
||||
} |
||||
|
||||
if (bytesToAppend > 0) { |
||||
// Grow buffer if needed to ensure we copy at least one batch |
||||
if (readBuffer.capacity < bytesToAppend) |
||||
readBuffer = bufferSupplier.get(bytesToAppend) |
||||
|
||||
readBuffer.limit(bytesToAppend) |
||||
records.readInto(readBuffer, position) |
||||
|
||||
append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)) |
||||
} |
||||
|
||||
bufferSupplier.release(readBuffer) |
||||
bytesToAppend |
||||
} |
||||
|
||||
/** |
||||
* Append records from a file beginning at the given position until either the end of the file |
||||
* is reached or an offset is found which is too large to convert to a relative offset for the indexes. |
||||
* |
||||
* @return the number of bytes appended to the log (may be less than the size of the input if an |
||||
* offset is encountered which would overflow this segment) |
||||
*/ |
||||
def appendFromFile(records: FileRecords, start: Int): Int = { |
||||
var position = start |
||||
val bufferSupplier: BufferSupplier = new BufferSupplier.GrowableBufferSupplier |
||||
while (position < start + records.sizeInBytes) { |
||||
val bytesAppended = appendChunkFromFile(records, position, bufferSupplier) |
||||
if (bytesAppended == 0) |
||||
return position - start |
||||
position += bytesAppended |
||||
} |
||||
position - start |
||||
} |
||||
|
||||
@nonthreadsafe |
||||
def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long): Unit = { |
||||
if (completedTxn.isAborted) { |
||||
trace(s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset") |
||||
txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset)) |
||||
} |
||||
} |
||||
|
||||
private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = { |
||||
if (batch.hasProducerId) { |
||||
val producerId = batch.producerId |
||||
val appendInfo = producerStateManager.prepareUpdate(producerId, AppendOrigin.REPLICATION) |
||||
val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()) |
||||
producerStateManager.update(appendInfo) |
||||
maybeCompletedTxn.ifPresent(completedTxn => { |
||||
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) |
||||
updateTxnIndex(completedTxn, lastStableOffset) |
||||
producerStateManager.completeTxn(completedTxn) |
||||
}) |
||||
} |
||||
producerStateManager.updateMapEndOffset(batch.lastOffset + 1) |
||||
} |
||||
|
||||
/** |
||||
* Find the physical file position for the first message with offset >= the requested offset. |
||||
* |
||||
* The startingFilePosition argument is an optimization that can be used if we already know a valid starting position |
||||
* in the file higher than the greatest-lower-bound from the index. |
||||
* |
||||
* @param offset The offset we want to translate |
||||
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and |
||||
* when omitted, the search will begin at the position in the offset index. |
||||
* @return The position in the log storing the message with the least offset >= the requested offset and the size of the |
||||
* message or null if no message meets this criteria. |
||||
*/ |
||||
@threadsafe |
||||
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = { |
||||
val mapping = offsetIndex.lookup(offset) |
||||
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) |
||||
} |
||||
|
||||
/** |
||||
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include |
||||
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. |
||||
* |
||||
* @param startOffset A lower bound on the first offset to include in the message set we read |
||||
* @param maxSize The maximum number of bytes to include in the message set we read |
||||
* @param maxPosition The maximum position in the log segment that should be exposed for read |
||||
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) |
||||
* |
||||
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, |
||||
* or null if the startOffset is larger than the largest offset in this log |
||||
*/ |
||||
@threadsafe |
||||
def read(startOffset: Long, |
||||
maxSize: Int, |
||||
maxPosition: Long = size, |
||||
minOneMessage: Boolean = false): FetchDataInfo = { |
||||
if (maxSize < 0) |
||||
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log") |
||||
|
||||
val startOffsetAndSize = translateOffset(startOffset) |
||||
|
||||
// if the start position is already off the end of the log, return null |
||||
if (startOffsetAndSize == null) |
||||
return null |
||||
|
||||
val startPosition = startOffsetAndSize.position |
||||
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition) |
||||
|
||||
val adjustedMaxSize = |
||||
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) |
||||
else maxSize |
||||
|
||||
// return a log segment but with zero size in the case below |
||||
if (adjustedMaxSize == 0) |
||||
return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) |
||||
|
||||
// calculate the length of the message set to read based on whether or not they gave us a maxOffset |
||||
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize) |
||||
|
||||
new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), |
||||
adjustedMaxSize < startOffsetAndSize.size, Optional.empty()) |
||||
} |
||||
|
||||
def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] = |
||||
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset) |
||||
|
||||
/** |
||||
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes |
||||
* from the end of the log and index. |
||||
* |
||||
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover |
||||
* the transaction index. |
||||
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. |
||||
* @return The number of bytes truncated from the log |
||||
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow |
||||
*/ |
||||
@nonthreadsafe |
||||
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = { |
||||
offsetIndex.reset() |
||||
timeIndex.reset() |
||||
txnIndex.reset() |
||||
var validBytes = 0 |
||||
var lastIndexEntry = 0 |
||||
maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN |
||||
try { |
||||
for (batch <- log.batches.asScala) { |
||||
batch.ensureValid() |
||||
ensureOffsetInRange(batch.lastOffset) |
||||
|
||||
// The max timestamp is exposed at the batch level, so no need to iterate the records |
||||
if (batch.maxTimestamp > maxTimestampSoFar) { |
||||
maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp, batch.lastOffset) |
||||
} |
||||
|
||||
// Build offset index |
||||
if (validBytes - lastIndexEntry > indexIntervalBytes) { |
||||
offsetIndex.append(batch.lastOffset, validBytes) |
||||
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) |
||||
lastIndexEntry = validBytes |
||||
} |
||||
validBytes += batch.sizeInBytes() |
||||
|
||||
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { |
||||
leaderEpochCache.foreach { cache => |
||||
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.asScala.forall(batch.partitionLeaderEpoch > _)) |
||||
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) |
||||
} |
||||
updateProducerState(producerStateManager, batch) |
||||
} |
||||
} |
||||
} catch { |
||||
case e@ (_: CorruptRecordException | _: InvalidRecordException) => |
||||
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s" |
||||
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause)) |
||||
} |
||||
val truncated = log.sizeInBytes - validBytes |
||||
if (truncated > 0) |
||||
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") |
||||
|
||||
log.truncateTo(validBytes) |
||||
offsetIndex.trimToValidSize() |
||||
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. |
||||
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true) |
||||
timeIndex.trimToValidSize() |
||||
truncated |
||||
} |
||||
|
||||
private def loadLargestTimestamp(): Unit = { |
||||
// Get the last time index entry. If the time index is empty, it will return (-1, baseOffset) |
||||
val lastTimeIndexEntry = timeIndex.lastEntry |
||||
maxTimestampAndOffsetSoFar = lastTimeIndexEntry |
||||
|
||||
val offsetPosition = offsetIndex.lookup(lastTimeIndexEntry.offset) |
||||
// Scan the rest of the messages to see if there is a larger timestamp after the last time index entry. |
||||
val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position) |
||||
if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) { |
||||
maxTimestampAndOffsetSoFar = new TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, maxTimestampOffsetAfterLastEntry.offset) |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Check whether the last offset of the last batch in this segment overflows the indexes. |
||||
*/ |
||||
def hasOverflow: Boolean = { |
||||
val nextOffset = readNextOffset |
||||
nextOffset > baseOffset && !canConvertToRelativeOffset(nextOffset - 1) |
||||
} |
||||
|
||||
def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = |
||||
txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset) |
||||
|
||||
override def toString: String = "LogSegment(baseOffset=" + baseOffset + |
||||
", size=" + size + |
||||
", lastModifiedTime=" + lastModified + |
||||
", largestRecordTimestamp=" + largestRecordTimestamp + |
||||
")" |
||||
|
||||
/** |
||||
* Truncate off all index and log entries with offsets >= the given offset. |
||||
* If the given offset is larger than the largest message in this segment, do nothing. |
||||
* |
||||
* @param offset The offset to truncate to |
||||
* @return The number of log bytes truncated |
||||
*/ |
||||
@nonthreadsafe |
||||
def truncateTo(offset: Long): Int = { |
||||
// Do offset translation before truncating the index to avoid needless scanning |
||||
// in case we truncate the full index |
||||
val mapping = translateOffset(offset) |
||||
offsetIndex.truncateTo(offset) |
||||
timeIndex.truncateTo(offset) |
||||
txnIndex.truncateTo(offset) |
||||
|
||||
// After truncation, reset and allocate more space for the (new currently active) index |
||||
offsetIndex.resize(offsetIndex.maxIndexSize) |
||||
timeIndex.resize(timeIndex.maxIndexSize) |
||||
|
||||
val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position) |
||||
if (log.sizeInBytes == 0) { |
||||
created = time.milliseconds |
||||
rollingBasedTimestamp = None |
||||
} |
||||
|
||||
bytesSinceLastIndexEntry = 0 |
||||
if (maxTimestampSoFar >= 0) |
||||
loadLargestTimestamp() |
||||
bytesTruncated |
||||
} |
||||
|
||||
/** |
||||
* Calculate the offset that would be used for the next message to be append to this segment. |
||||
* Note that this is expensive. |
||||
*/ |
||||
@threadsafe |
||||
def readNextOffset: Long = { |
||||
val fetchData = read(offsetIndex.lastOffset, log.sizeInBytes) |
||||
if (fetchData == null) |
||||
baseOffset |
||||
else |
||||
fetchData.records.batches.asScala.lastOption |
||||
.map(_.nextOffset) |
||||
.getOrElse(baseOffset) |
||||
} |
||||
|
||||
/** |
||||
* Flush this log segment to disk |
||||
*/ |
||||
@threadsafe |
||||
def flush(): Unit = { |
||||
LogFlushStats.logFlushTimer.time { () => |
||||
log.flush() |
||||
offsetIndex.flush() |
||||
timeIndex.flush() |
||||
txnIndex.flush() |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Update the directory reference for the log and indices in this segment. This would typically be called after a |
||||
* directory is renamed. |
||||
*/ |
||||
def updateParentDir(dir: File): Unit = { |
||||
log.updateParentDir(dir) |
||||
lazyOffsetIndex.updateParentDir(dir) |
||||
lazyTimeIndex.updateParentDir(dir) |
||||
txnIndex.updateParentDir(dir) |
||||
} |
||||
|
||||
/** |
||||
* Change the suffix for the index and log files for this log segment |
||||
* IOException from this method should be handled by the caller |
||||
*/ |
||||
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = { |
||||
log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) |
||||
lazyOffsetIndex.renameTo(new File(Utils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) |
||||
lazyTimeIndex.renameTo(new File(Utils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) |
||||
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) |
||||
} |
||||
|
||||
def hasSuffix(suffix: String): Boolean = { |
||||
log.file.getName.endsWith(suffix) && |
||||
lazyOffsetIndex.file.getName.endsWith(suffix) && |
||||
lazyTimeIndex.file.getName.endsWith(suffix) && |
||||
txnIndex.file.getName.endsWith(suffix) |
||||
} |
||||
|
||||
/** |
||||
* Append the largest time index entry to the time index and trim the log and indexes. |
||||
* |
||||
* The time index entry appended will be used to decide when to delete the segment. |
||||
*/ |
||||
def onBecomeInactiveSegment(): Unit = { |
||||
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true) |
||||
offsetIndex.trimToValidSize() |
||||
timeIndex.trimToValidSize() |
||||
log.trim() |
||||
} |
||||
|
||||
/** |
||||
* If not previously loaded, |
||||
* load the timestamp of the first message into memory. |
||||
*/ |
||||
private def loadFirstBatchTimestamp(): Unit = { |
||||
if (rollingBasedTimestamp.isEmpty) { |
||||
val iter = log.batches.iterator() |
||||
if (iter.hasNext) |
||||
rollingBasedTimestamp = Some(iter.next().maxTimestamp) |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The time this segment has waited to be rolled. |
||||
* If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment |
||||
* is rolled if the difference between the new batch's timestamp and the first batch's timestamp exceeds the |
||||
* segment rolling time. |
||||
* If the first batch does not have a timestamp, we use the wall clock time to determine when to roll a segment. A |
||||
* segment is rolled if the difference between the current wall clock time and the segment create time exceeds the |
||||
* segment rolling time. |
||||
*/ |
||||
def timeWaitedForRoll(now: Long, messageTimestamp: Long): Long = { |
||||
// Load the timestamp of the first message into memory |
||||
loadFirstBatchTimestamp() |
||||
rollingBasedTimestamp match { |
||||
case Some(t) if t >= 0 => messageTimestamp - t |
||||
case _ => now - created |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* @return the first batch timestamp if the timestamp is available. Otherwise return Long.MaxValue |
||||
*/ |
||||
def getFirstBatchTimestamp(): Long = { |
||||
loadFirstBatchTimestamp() |
||||
rollingBasedTimestamp match { |
||||
case Some(t) if t >= 0 => t |
||||
case _ => Long.MaxValue |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Search the message offset based on timestamp and offset. |
||||
* |
||||
* This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules: |
||||
* |
||||
* - If all the messages in the segment have smaller offsets, return None |
||||
* - If all the messages in the segment have smaller timestamps, return None |
||||
* - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp |
||||
* the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp. |
||||
* - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp |
||||
* is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset. |
||||
* |
||||
* This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not |
||||
* see any message when scanning the log from the indexed position. The latter could happen if the log is truncated |
||||
* after we get the indexed position but before we scan the log from there. In this case we simply return None and the |
||||
* caller will need to check on the truncated log and maybe retry or even do the search on another log segment. |
||||
* |
||||
* @param timestamp The timestamp to search for. |
||||
* @param startingOffset The starting offset to search. |
||||
* @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message. |
||||
*/ |
||||
def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampAndOffset] = { |
||||
// Get the index entry with a timestamp less than or equal to the target timestamp |
||||
val timestampOffset = timeIndex.lookup(timestamp) |
||||
val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position |
||||
|
||||
// Search the timestamp |
||||
Option(log.searchForTimestamp(timestamp, position, startingOffset)) |
||||
} |
||||
|
||||
/** |
||||
* Close this log segment |
||||
*/ |
||||
def close(): Unit = { |
||||
if (_maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) |
||||
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true), this) |
||||
CoreUtils.swallow(lazyOffsetIndex.close(), this) |
||||
CoreUtils.swallow(lazyTimeIndex.close(), this) |
||||
CoreUtils.swallow(log.close(), this) |
||||
CoreUtils.swallow(txnIndex.close(), this) |
||||
} |
||||
|
||||
/** |
||||
* Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed |
||||
*/ |
||||
def closeHandlers(): Unit = { |
||||
CoreUtils.swallow(lazyOffsetIndex.closeHandler(), this) |
||||
CoreUtils.swallow(lazyTimeIndex.closeHandler(), this) |
||||
CoreUtils.swallow(log.closeHandlers(), this) |
||||
CoreUtils.swallow(txnIndex.close(), this) |
||||
} |
||||
|
||||
/** |
||||
* Delete this log segment from the filesystem. |
||||
*/ |
||||
def deleteIfExists(): Unit = { |
||||
def delete(delete: () => Boolean, fileType: String, file: File, logIfMissing: Boolean): Unit = { |
||||
try { |
||||
if (delete()) |
||||
info(s"Deleted $fileType ${file.getAbsolutePath}.") |
||||
else if (logIfMissing) |
||||
info(s"Failed to delete $fileType ${file.getAbsolutePath} because it does not exist.") |
||||
} |
||||
catch { |
||||
case e: IOException => throw new IOException(s"Delete of $fileType ${file.getAbsolutePath} failed.", e) |
||||
} |
||||
} |
||||
|
||||
CoreUtils.tryAll(Seq( |
||||
() => delete(log.deleteIfExists _, "log", log.file, logIfMissing = true), |
||||
() => delete(lazyOffsetIndex.deleteIfExists _, "offset index", lazyOffsetIndex.file, logIfMissing = true), |
||||
() => delete(lazyTimeIndex.deleteIfExists _, "time index", lazyTimeIndex.file, logIfMissing = true), |
||||
() => delete(txnIndex.deleteIfExists _, "transaction index", txnIndex.file, logIfMissing = false) |
||||
)) |
||||
} |
||||
|
||||
def deleted(): Boolean = { |
||||
!log.file.exists() && !lazyOffsetIndex.file.exists() && !lazyTimeIndex.file.exists() && !txnIndex.file.exists() |
||||
} |
||||
|
||||
/** |
||||
* The last modified time of this log segment as a unix time stamp |
||||
*/ |
||||
def lastModified = log.file.lastModified |
||||
|
||||
/** |
||||
* The largest timestamp this segment contains, if maxTimestampSoFar >= 0, otherwise None. |
||||
*/ |
||||
def largestRecordTimestamp: Option[Long] = if (maxTimestampSoFar >= 0) Some(maxTimestampSoFar) else None |
||||
|
||||
/** |
||||
* The largest timestamp this segment contains. |
||||
*/ |
||||
def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified |
||||
|
||||
/** |
||||
* Change the last modified time for this log segment |
||||
*/ |
||||
def lastModified_=(ms: Long) = { |
||||
val fileTime = FileTime.fromMillis(ms) |
||||
Files.setLastModifiedTime(log.file.toPath, fileTime) |
||||
Files.setLastModifiedTime(lazyOffsetIndex.file.toPath, fileTime) |
||||
Files.setLastModifiedTime(lazyTimeIndex.file.toPath, fileTime) |
||||
} |
||||
|
||||
} |
||||
|
||||
object LogSegment { |
||||
|
||||
def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, |
||||
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { |
||||
val maxIndexSize = config.maxIndexSize |
||||
new LogSegment( |
||||
FileRecords.open(UnifiedLog.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), |
||||
LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize), |
||||
LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize), |
||||
new TransactionIndex(baseOffset, UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)), |
||||
baseOffset, |
||||
indexIntervalBytes = config.indexInterval, |
||||
rollJitterMs = config.randomSegmentJitter, |
||||
time) |
||||
} |
||||
|
||||
def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = { |
||||
UnifiedLog.deleteFileIfExists(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix)) |
||||
UnifiedLog.deleteFileIfExists(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix)) |
||||
UnifiedLog.deleteFileIfExists(UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)) |
||||
UnifiedLog.deleteFileIfExists(UnifiedLog.logFile(dir, baseOffset, fileSuffix)) |
||||
} |
||||
} |
||||
|
||||
object LogFlushStats { |
||||
private val metricsGroup = new KafkaMetricsGroup(LogFlushStats.getClass) |
||||
val logFlushTimer: Timer = metricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS) |
||||
} |
@ -1,268 +0,0 @@
@@ -1,268 +0,0 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package kafka.log |
||||
|
||||
import java.io.File |
||||
import java.util.Map |
||||
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} |
||||
|
||||
import kafka.utils.threadsafe |
||||
import org.apache.kafka.common.TopicPartition |
||||
|
||||
import scala.jdk.CollectionConverters._ |
||||
|
||||
/** |
||||
* This class encapsulates a thread-safe navigable map of LogSegment instances and provides the |
||||
* required read and write behavior on the map. |
||||
* |
||||
* @param topicPartition the TopicPartition associated with the segments |
||||
* (useful for logging purposes) |
||||
*/ |
||||
class LogSegments(topicPartition: TopicPartition) { |
||||
|
||||
/* the segments of the log with key being LogSegment base offset and value being a LogSegment */ |
||||
private val segments: ConcurrentNavigableMap[Long, LogSegment] = new ConcurrentSkipListMap[Long, LogSegment] |
||||
|
||||
/** |
||||
* @return true if the segments are empty, false otherwise. |
||||
*/ |
||||
@threadsafe |
||||
def isEmpty: Boolean = segments.isEmpty |
||||
|
||||
/** |
||||
* @return true if the segments are non-empty, false otherwise. |
||||
*/ |
||||
@threadsafe |
||||
def nonEmpty: Boolean = !isEmpty |
||||
|
||||
/** |
||||
* Add the given segment, or replace an existing entry. |
||||
* |
||||
* @param segment the segment to add |
||||
*/ |
||||
@threadsafe |
||||
def add(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment) |
||||
|
||||
/** |
||||
* Remove the segment at the provided offset. |
||||
* |
||||
* @param offset the offset to be removed |
||||
*/ |
||||
@threadsafe |
||||
def remove(offset: Long): Unit = segments.remove(offset) |
||||
|
||||
/** |
||||
* Clears all entries. |
||||
*/ |
||||
@threadsafe |
||||
def clear(): Unit = segments.clear() |
||||
|
||||
/** |
||||
* Close all segments. |
||||
*/ |
||||
def close(): Unit = values.foreach(_.close()) |
||||
|
||||
/** |
||||
* Close the handlers for all segments. |
||||
*/ |
||||
def closeHandlers(): Unit = values.foreach(_.closeHandlers()) |
||||
|
||||
/** |
||||
* Update the directory reference for the log and indices of all segments. |
||||
* |
||||
* @param dir the renamed directory |
||||
*/ |
||||
def updateParentDir(dir: File): Unit = values.foreach(_.updateParentDir(dir)) |
||||
|
||||
/** |
||||
* Take care! this is an O(n) operation, where n is the number of segments. |
||||
* |
||||
* @return The number of segments. |
||||
* |
||||
*/ |
||||
@threadsafe |
||||
def numberOfSegments: Int = segments.size |
||||
|
||||
/** |
||||
* @return the base offsets of all segments |
||||
*/ |
||||
def baseOffsets: Iterable[Long] = segments.values().asScala.map(_.baseOffset) |
||||
|
||||
/** |
||||
* @param offset the segment to be checked |
||||
* @return true if a segment exists at the provided offset, false otherwise. |
||||
*/ |
||||
@threadsafe |
||||
def contains(offset: Long): Boolean = segments.containsKey(offset) |
||||
|
||||
/** |
||||
* Retrieves a segment at the specified offset. |
||||
* |
||||
* @param offset the segment to be retrieved |
||||
* |
||||
* @return the segment if it exists, otherwise None. |
||||
*/ |
||||
@threadsafe |
||||
def get(offset: Long): Option[LogSegment] = Option(segments.get(offset)) |
||||
|
||||
/** |
||||
* @return an iterator to the log segments ordered from oldest to newest. |
||||
*/ |
||||
def values: Iterable[LogSegment] = segments.values.asScala |
||||
|
||||
/** |
||||
* @return An iterator to all segments beginning with the segment that includes "from" and ending |
||||
* with the segment that includes up to "to-1" or the end of the log (if to > end of log). |
||||
*/ |
||||
def values(from: Long, to: Long): Iterable[LogSegment] = { |
||||
if (from == to) { |
||||
// Handle non-segment-aligned empty sets |
||||
List.empty[LogSegment] |
||||
} else if (to < from) { |
||||
throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + |
||||
s"from offset $from which is greater than limit offset $to") |
||||
} else { |
||||
val view = Option(segments.floorKey(from)).map { floor => |
||||
segments.subMap(floor, to) |
||||
}.getOrElse(segments.headMap(to)) |
||||
view.values.asScala |
||||
} |
||||
} |
||||
|
||||
def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = { |
||||
val activeSegment = lastSegment.get |
||||
if (from > activeSegment.baseOffset) |
||||
Seq.empty |
||||
else |
||||
values(from, activeSegment.baseOffset) |
||||
} |
||||
|
||||
/** |
||||
* @return the entry associated with the greatest offset less than or equal to the given offset, |
||||
* if it exists. |
||||
*/ |
||||
@threadsafe |
||||
private def floorEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.floorEntry(offset)) |
||||
|
||||
/** |
||||
* @return the log segment with the greatest offset less than or equal to the given offset, |
||||
* if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def floorSegment(offset: Long): Option[LogSegment] = floorEntry(offset).map(_.getValue) |
||||
|
||||
/** |
||||
* @return the entry associated with the greatest offset strictly less than the given offset, |
||||
* if it exists. |
||||
*/ |
||||
@threadsafe |
||||
private def lowerEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.lowerEntry(offset)) |
||||
|
||||
/** |
||||
* @return the log segment with the greatest offset strictly less than the given offset, |
||||
* if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def lowerSegment(offset: Long): Option[LogSegment] = lowerEntry(offset).map(_.getValue) |
||||
|
||||
/** |
||||
* @return the entry associated with the smallest offset strictly greater than the given offset, |
||||
* if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.higherEntry(offset)) |
||||
|
||||
/** |
||||
* @return the log segment with the smallest offset strictly greater than the given offset, |
||||
* if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def higherSegment(offset: Long): Option[LogSegment] = higherEntry(offset).map(_.getValue) |
||||
|
||||
/** |
||||
* @return the entry associated with the smallest offset, if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def firstEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.firstEntry) |
||||
|
||||
/** |
||||
* @return the log segment associated with the smallest offset, if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def firstSegment: Option[LogSegment] = firstEntry.map(_.getValue) |
||||
|
||||
/** |
||||
* @return the base offset of the log segment associated with the smallest offset, if it exists |
||||
*/ |
||||
private[log] def firstSegmentBaseOffset: Option[Long] = firstSegment.map(_.baseOffset) |
||||
|
||||
/** |
||||
* @return the entry associated with the greatest offset, if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry) |
||||
|
||||
/** |
||||
* @return the log segment with the greatest offset, if it exists. |
||||
*/ |
||||
@threadsafe |
||||
def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue) |
||||
|
||||
/** |
||||
* @return an iterable with log segments ordered from lowest base offset to highest, |
||||
* each segment returned has a base offset strictly greater than the provided baseOffset. |
||||
*/ |
||||
def higherSegments(baseOffset: Long): Iterable[LogSegment] = { |
||||
val view = |
||||
Option(segments.higherKey(baseOffset)).map { |
||||
higherOffset => segments.tailMap(higherOffset, true) |
||||
}.getOrElse(collection.immutable.Map[Long, LogSegment]().asJava) |
||||
view.values.asScala |
||||
} |
||||
|
||||
/** |
||||
* The active segment that is currently taking appends |
||||
*/ |
||||
def activeSegment = lastSegment.get |
||||
|
||||
def sizeInBytes: Long = LogSegments.sizeInBytes(values) |
||||
|
||||
/** |
||||
* Returns an Iterable containing segments matching the provided predicate. |
||||
* |
||||
* @param predicate the predicate to be used for filtering segments. |
||||
*/ |
||||
def filter(predicate: LogSegment => Boolean): Iterable[LogSegment] = values.filter(predicate) |
||||
} |
||||
|
||||
object LogSegments { |
||||
/** |
||||
* Calculate a log's size (in bytes) from the provided log segments. |
||||
* |
||||
* @param segments The log segments to calculate the size of |
||||
* @return Sum of the log segments' sizes (in bytes) |
||||
*/ |
||||
def sizeInBytes(segments: Iterable[LogSegment]): Long = |
||||
segments.map(_.size.toLong).sum |
||||
|
||||
def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = { |
||||
segments.map { |
||||
segment => |
||||
segment.getFirstBatchTimestamp() |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,889 @@
@@ -0,0 +1,889 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.storage.internals.log; |
||||
|
||||
import java.io.Closeable; |
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.NoSuchFileException; |
||||
import java.nio.file.attribute.FileTime; |
||||
import java.util.Iterator; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.Callable; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import com.yammer.metrics.core.MetricName; |
||||
import com.yammer.metrics.core.Timer; |
||||
import org.apache.kafka.common.InvalidRecordException; |
||||
import org.apache.kafka.common.errors.CorruptRecordException; |
||||
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; |
||||
import org.apache.kafka.common.record.FileRecords; |
||||
import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; |
||||
import org.apache.kafka.common.record.MemoryRecords; |
||||
import org.apache.kafka.common.record.RecordBatch; |
||||
import org.apache.kafka.common.utils.BufferSupplier; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup; |
||||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.slf4j.event.Level; |
||||
|
||||
import static java.util.Arrays.asList; |
||||
|
||||
/** |
||||
* A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing |
||||
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each |
||||
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in |
||||
* any previous segment. |
||||
* |
||||
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. |
||||
* |
||||
* This class is not thread-safe. |
||||
*/ |
||||
public class LogSegment implements Closeable { |
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); |
||||
private static final Timer LOG_FLUSH_TIMER; |
||||
|
||||
static { |
||||
KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { |
||||
@Override |
||||
public MetricName metricName(String name, Map<String, String> tags) { |
||||
// Override the group and type names for compatibility - this metrics group was previously defined within
|
||||
// a Scala object named `kafka.log.LogFlushStats`
|
||||
return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); |
||||
} |
||||
}; |
||||
LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); |
||||
} |
||||
|
||||
private final FileRecords log; |
||||
private final LazyIndex<OffsetIndex> lazyOffsetIndex; |
||||
private final LazyIndex<TimeIndex> lazyTimeIndex; |
||||
private final TransactionIndex txnIndex; |
||||
private final long baseOffset; |
||||
private final int indexIntervalBytes; |
||||
private final long rollJitterMs; |
||||
private final Time time; |
||||
|
||||
// The timestamp we used for time based log rolling and for ensuring max compaction delay
|
||||
// volatile for LogCleaner to see the update
|
||||
private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); |
||||
|
||||
/* The maximum timestamp and offset we see so far */ |
||||
private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; |
||||
|
||||
private long created; |
||||
|
||||
/* the number of bytes since we last added an entry in the offset index */ |
||||
private int bytesSinceLastIndexEntry = 0; |
||||
|
||||
/** |
||||
* Create a LogSegment with the provided parameters. |
||||
* |
||||
* @param log The file records containing log entries |
||||
* @param lazyOffsetIndex The offset index |
||||
* @param lazyTimeIndex The timestamp index |
||||
* @param txnIndex The transaction index |
||||
* @param baseOffset A lower bound on the offsets in this segment |
||||
* @param indexIntervalBytes The approximate number of bytes between entries in the index |
||||
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time |
||||
* @param time The time instance |
||||
*/ |
||||
public LogSegment(FileRecords log, |
||||
LazyIndex<OffsetIndex> lazyOffsetIndex, |
||||
LazyIndex<TimeIndex> lazyTimeIndex, |
||||
TransactionIndex txnIndex, |
||||
long baseOffset, |
||||
int indexIntervalBytes, |
||||
long rollJitterMs, |
||||
Time time) { |
||||
this.log = log; |
||||
this.lazyOffsetIndex = lazyOffsetIndex; |
||||
this.lazyTimeIndex = lazyTimeIndex; |
||||
this.txnIndex = txnIndex; |
||||
this.baseOffset = baseOffset; |
||||
this.indexIntervalBytes = indexIntervalBytes; |
||||
this.rollJitterMs = rollJitterMs; |
||||
this.time = time; |
||||
this.created = time.milliseconds(); |
||||
} |
||||
|
||||
// Visible for testing
|
||||
public LogSegment(LogSegment segment) { |
||||
this(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, segment.txnIndex, segment.baseOffset, |
||||
segment.indexIntervalBytes, segment.rollJitterMs, segment.time); |
||||
} |
||||
|
||||
public OffsetIndex offsetIndex() throws IOException { |
||||
return lazyOffsetIndex.get(); |
||||
} |
||||
|
||||
public File offsetIndexFile() { |
||||
return lazyOffsetIndex.file(); |
||||
} |
||||
|
||||
public TimeIndex timeIndex() throws IOException { |
||||
return lazyTimeIndex.get(); |
||||
} |
||||
|
||||
public File timeIndexFile() { |
||||
return lazyTimeIndex.file(); |
||||
} |
||||
|
||||
public long baseOffset() { |
||||
return baseOffset; |
||||
} |
||||
|
||||
public FileRecords log() { |
||||
return log; |
||||
} |
||||
|
||||
public long rollJitterMs() { |
||||
return rollJitterMs; |
||||
} |
||||
|
||||
public TransactionIndex txnIndex() { |
||||
return txnIndex; |
||||
} |
||||
|
||||
public boolean shouldRoll(RollParams rollParams) throws IOException { |
||||
boolean reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs; |
||||
int size = size(); |
||||
return size > rollParams.maxSegmentBytes - rollParams.messagesSize || |
||||
(size > 0 && reachedRollMs) || |
||||
offsetIndex().isFull() || timeIndex().isFull() || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages); |
||||
} |
||||
|
||||
public void resizeIndexes(int size) throws IOException { |
||||
offsetIndex().resize(size); |
||||
timeIndex().resize(size); |
||||
} |
||||
|
||||
public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException { |
||||
if (offsetIndexFile().exists()) { |
||||
// Resize the time index file to 0 if it is newly created.
|
||||
if (timeIndexFileNewlyCreated) |
||||
timeIndex().resize(0); |
||||
// Sanity checks for time index and offset index are skipped because
|
||||
// we will recover the segments above the recovery point in recoverLog()
|
||||
// in any case so sanity checking them here is redundant.
|
||||
txnIndex.sanityCheck(); |
||||
} else |
||||
throw new NoSuchFileException("Offset index file " + offsetIndexFile().getAbsolutePath() + " does not exist"); |
||||
} |
||||
|
||||
/** |
||||
* The first time this is invoked, it will result in a time index lookup (including potential materialization of |
||||
* the time index). |
||||
*/ |
||||
public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException { |
||||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) |
||||
maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); |
||||
return maxTimestampAndOffsetSoFar; |
||||
} |
||||
|
||||
/** |
||||
* The maximum timestamp we see so far. |
||||
* |
||||
* Note that this may result in time index materialization. |
||||
*/ |
||||
public long maxTimestampSoFar() throws IOException { |
||||
return readMaxTimestampAndOffsetSoFar().timestamp; |
||||
} |
||||
|
||||
/** |
||||
* Note that this may result in time index materialization. |
||||
*/ |
||||
private long offsetOfMaxTimestampSoFar() throws IOException { |
||||
return readMaxTimestampAndOffsetSoFar().offset; |
||||
} |
||||
|
||||
/* Return the size in bytes of this log segment */ |
||||
public int size() { |
||||
return log.sizeInBytes(); |
||||
} |
||||
|
||||
/** |
||||
* checks that the argument offset can be represented as an integer offset relative to the baseOffset. |
||||
*/ |
||||
private boolean canConvertToRelativeOffset(long offset) throws IOException { |
||||
return offsetIndex().canAppendOffset(offset); |
||||
} |
||||
|
||||
/** |
||||
* Append the given messages starting with the given offset. Add |
||||
* an entry to the index if needed. |
||||
* |
||||
* It is assumed this method is being called from within a lock, it is not thread-safe otherwise. |
||||
* |
||||
* @param largestOffset The last offset in the message set |
||||
* @param largestTimestampMs The largest timestamp in the message set. |
||||
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. |
||||
* @param records The log entries to append. |
||||
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow |
||||
*/ |
||||
public void append(long largestOffset, |
||||
long largestTimestampMs, |
||||
long shallowOffsetOfMaxTimestamp, |
||||
MemoryRecords records) throws IOException { |
||||
if (records.sizeInBytes() > 0) { |
||||
LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at shallow offset {}", |
||||
records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); |
||||
int physicalPosition = log.sizeInBytes(); |
||||
if (physicalPosition == 0) |
||||
rollingBasedTimestamp = OptionalLong.of(largestTimestampMs); |
||||
|
||||
ensureOffsetInRange(largestOffset); |
||||
|
||||
// append the messages
|
||||
long appendedBytes = log.append(records); |
||||
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); |
||||
// Update the in memory max timestamp and corresponding offset.
|
||||
if (largestTimestampMs > maxTimestampSoFar()) { |
||||
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); |
||||
} |
||||
// append an entry to the index (if needed)
|
||||
if (bytesSinceLastIndexEntry > indexIntervalBytes) { |
||||
offsetIndex().append(largestOffset, physicalPosition); |
||||
timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); |
||||
bytesSinceLastIndexEntry = 0; |
||||
} |
||||
bytesSinceLastIndexEntry += records.sizeInBytes(); |
||||
} |
||||
} |
||||
|
||||
private void ensureOffsetInRange(long offset) throws IOException { |
||||
if (!canConvertToRelativeOffset(offset)) |
||||
throw new LogSegmentOffsetOverflowException(this, offset); |
||||
} |
||||
|
||||
private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException { |
||||
int bytesToAppend = 0; |
||||
long maxTimestamp = Long.MIN_VALUE; |
||||
long offsetOfMaxTimestamp = Long.MIN_VALUE; |
||||
long maxOffset = Long.MIN_VALUE; |
||||
ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024); |
||||
|
||||
// find all batches that are valid to be appended to the current log segment and
|
||||
// determine the maximum offset and timestamp
|
||||
Iterator<FileChannelRecordBatch> nextBatches = records.batchesFrom(position).iterator(); |
||||
FileChannelRecordBatch batch; |
||||
while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) { |
||||
if (batch.maxTimestamp() > maxTimestamp) { |
||||
maxTimestamp = batch.maxTimestamp(); |
||||
offsetOfMaxTimestamp = batch.lastOffset(); |
||||
} |
||||
maxOffset = batch.lastOffset(); |
||||
bytesToAppend += batch.sizeInBytes(); |
||||
} |
||||
|
||||
if (bytesToAppend > 0) { |
||||
// Grow buffer if needed to ensure we copy at least one batch
|
||||
if (readBuffer.capacity() < bytesToAppend) |
||||
readBuffer = bufferSupplier.get(bytesToAppend); |
||||
|
||||
readBuffer.limit(bytesToAppend); |
||||
records.readInto(readBuffer, position); |
||||
|
||||
append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); |
||||
} |
||||
|
||||
bufferSupplier.release(readBuffer); |
||||
return bytesToAppend; |
||||
} |
||||
|
||||
private FileChannelRecordBatch nextAppendableBatch(Iterator<FileChannelRecordBatch> recordBatches, |
||||
ByteBuffer readBuffer, |
||||
int bytesToAppend) throws IOException { |
||||
if (recordBatches.hasNext()) { |
||||
FileChannelRecordBatch batch = recordBatches.next(); |
||||
if (canConvertToRelativeOffset(batch.lastOffset()) && |
||||
(bytesToAppend == 0 || bytesToAppend + batch.sizeInBytes() < readBuffer.capacity())) |
||||
return batch; |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* Append records from a file beginning at the given position until either the end of the file |
||||
* is reached or an offset is found which is too large to convert to a relative offset for the indexes. |
||||
* |
||||
* @return the number of bytes appended to the log (may be less than the size of the input if an |
||||
* offset is encountered which would overflow this segment) |
||||
*/ |
||||
public int appendFromFile(FileRecords records, int start) throws IOException { |
||||
int position = start; |
||||
BufferSupplier bufferSupplier = new BufferSupplier.GrowableBufferSupplier(); |
||||
while (position < start + records.sizeInBytes()) { |
||||
int bytesAppended = appendChunkFromFile(records, position, bufferSupplier); |
||||
if (bytesAppended == 0) |
||||
return position - start; |
||||
position += bytesAppended; |
||||
} |
||||
return position - start; |
||||
} |
||||
|
||||
/* not thread safe */ |
||||
public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) throws IOException { |
||||
if (completedTxn.isAborted) { |
||||
LOGGER.trace("Writing aborted transaction {} to transaction index, last stable offset is {}", completedTxn, lastStableOffset); |
||||
txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset)); |
||||
} |
||||
} |
||||
|
||||
private void updateProducerState(ProducerStateManager producerStateManager, RecordBatch batch) throws IOException { |
||||
if (batch.hasProducerId()) { |
||||
long producerId = batch.producerId(); |
||||
ProducerAppendInfo appendInfo = producerStateManager.prepareUpdate(producerId, AppendOrigin.REPLICATION); |
||||
Optional<CompletedTxn> maybeCompletedTxn = appendInfo.append(batch, Optional.empty()); |
||||
producerStateManager.update(appendInfo); |
||||
if (maybeCompletedTxn.isPresent()) { |
||||
CompletedTxn completedTxn = maybeCompletedTxn.get(); |
||||
long lastStableOffset = producerStateManager.lastStableOffset(completedTxn); |
||||
updateTxnIndex(completedTxn, lastStableOffset); |
||||
producerStateManager.completeTxn(completedTxn); |
||||
} |
||||
} |
||||
producerStateManager.updateMapEndOffset(batch.lastOffset() + 1); |
||||
} |
||||
|
||||
/** |
||||
* Equivalent to {@code translateOffset(offset, 0)}. |
||||
* |
||||
* See {@link #translateOffset(long, int)} for details. |
||||
*/ |
||||
public LogOffsetPosition translateOffset(long offset) throws IOException { |
||||
return translateOffset(offset, 0); |
||||
} |
||||
|
||||
/** |
||||
* Find the physical file position for the first message with offset >= the requested offset. |
||||
* |
||||
* The startingFilePosition argument is an optimization that can be used if we already know a valid starting position |
||||
* in the file higher than the greatest-lower-bound from the index. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @param offset The offset we want to translate |
||||
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and |
||||
* when omitted, the search will begin at the position in the offset index. |
||||
* @return The position in the log storing the message with the least offset >= the requested offset and the size of the |
||||
* message or null if no message meets this criteria. |
||||
*/ |
||||
LogOffsetPosition translateOffset(long offset, int startingFilePosition) throws IOException { |
||||
OffsetPosition mapping = offsetIndex().lookup(offset); |
||||
return log.searchForOffsetWithSize(offset, Math.max(mapping.position, startingFilePosition)); |
||||
} |
||||
|
||||
/** |
||||
* Equivalent to {@code read(startOffset, maxSize, size())}. |
||||
* |
||||
* See {@link #read(long, int, long, boolean)} for details. |
||||
*/ |
||||
public FetchDataInfo read(long startOffset, int maxSize) throws IOException { |
||||
return read(startOffset, maxSize, size()); |
||||
} |
||||
|
||||
/** |
||||
* Equivalent to {@code read(startOffset, maxSize, maxPosition, false)}. |
||||
* |
||||
* See {@link #read(long, int, long, boolean)} for details. |
||||
*/ |
||||
public FetchDataInfo read(long startOffset, int maxSize, long maxPosition) throws IOException { |
||||
return read(startOffset, maxSize, maxPosition, false); |
||||
} |
||||
|
||||
/** |
||||
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include |
||||
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @param startOffset A lower bound on the first offset to include in the message set we read |
||||
* @param maxSize The maximum number of bytes to include in the message set we read |
||||
* @param maxPosition The maximum position in the log segment that should be exposed for read |
||||
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) |
||||
* |
||||
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, |
||||
* or null if the startOffset is larger than the largest offset in this log |
||||
*/ |
||||
public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boolean minOneMessage) throws IOException { |
||||
if (maxSize < 0) |
||||
throw new IllegalArgumentException("Invalid max size " + maxSize + " for log read from segment " + log); |
||||
|
||||
LogOffsetPosition startOffsetAndSize = translateOffset(startOffset); |
||||
|
||||
// if the start position is already off the end of the log, return null
|
||||
if (startOffsetAndSize == null) |
||||
return null; |
||||
|
||||
int startPosition = startOffsetAndSize.position; |
||||
LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition); |
||||
|
||||
int adjustedMaxSize = maxSize; |
||||
if (minOneMessage) |
||||
adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); |
||||
|
||||
// return a log segment but with zero size in the case below
|
||||
if (adjustedMaxSize == 0) |
||||
return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY); |
||||
|
||||
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
|
||||
int fetchSize = Math.min((int) (maxPosition - startPosition), adjustedMaxSize); |
||||
|
||||
return new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), |
||||
adjustedMaxSize < startOffsetAndSize.size, Optional.empty()); |
||||
} |
||||
|
||||
public OptionalLong fetchUpperBoundOffset(OffsetPosition startOffsetPosition, int fetchSize) throws IOException { |
||||
Optional<OffsetPosition> position = offsetIndex().fetchUpperBoundOffset(startOffsetPosition, fetchSize); |
||||
if (position.isPresent()) |
||||
return OptionalLong.of(position.get().offset); |
||||
return OptionalLong.empty(); |
||||
} |
||||
|
||||
/** |
||||
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes |
||||
* from the end of the log and index. |
||||
* |
||||
* This method is not thread-safe. |
||||
* |
||||
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover |
||||
* the transaction index. |
||||
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. |
||||
* @return The number of bytes truncated from the log |
||||
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow |
||||
*/ |
||||
public int recover(ProducerStateManager producerStateManager, Optional<LeaderEpochFileCache> leaderEpochCache) throws IOException { |
||||
offsetIndex().reset(); |
||||
timeIndex().reset(); |
||||
txnIndex.reset(); |
||||
int validBytes = 0; |
||||
int lastIndexEntry = 0; |
||||
maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; |
||||
try { |
||||
for (RecordBatch batch : log.batches()) { |
||||
batch.ensureValid(); |
||||
ensureOffsetInRange(batch.lastOffset()); |
||||
|
||||
// The max timestamp is exposed at the batch level, so no need to iterate the records
|
||||
if (batch.maxTimestamp() > maxTimestampSoFar()) { |
||||
maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp(), batch.lastOffset()); |
||||
} |
||||
|
||||
// Build offset index
|
||||
if (validBytes - lastIndexEntry > indexIntervalBytes) { |
||||
offsetIndex().append(batch.lastOffset(), validBytes); |
||||
timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); |
||||
lastIndexEntry = validBytes; |
||||
} |
||||
validBytes += batch.sizeInBytes(); |
||||
|
||||
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { |
||||
leaderEpochCache.ifPresent(cache -> { |
||||
if (batch.partitionLeaderEpoch() >= 0 && |
||||
(!cache.latestEpoch().isPresent() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt())) |
||||
cache.assign(batch.partitionLeaderEpoch(), batch.baseOffset()); |
||||
}); |
||||
updateProducerState(producerStateManager, batch); |
||||
} |
||||
} |
||||
} catch (CorruptRecordException | InvalidRecordException e) { |
||||
LOGGER.warn("Found invalid messages in log segment {} at byte offset {}: {}. {}", log.file().getAbsolutePath(), |
||||
validBytes, e.getMessage(), e.getCause()); |
||||
} |
||||
int truncated = log.sizeInBytes() - validBytes; |
||||
if (truncated > 0) |
||||
LOGGER.debug("Truncated {} invalid bytes at the end of segment {} during recovery", truncated, log.file().getAbsolutePath()); |
||||
|
||||
log.truncateTo(validBytes); |
||||
offsetIndex().trimToValidSize(); |
||||
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
|
||||
timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true); |
||||
timeIndex().trimToValidSize(); |
||||
return truncated; |
||||
} |
||||
|
||||
/** |
||||
* Check whether the last offset of the last batch in this segment overflows the indexes. |
||||
*/ |
||||
public boolean hasOverflow() throws IOException { |
||||
long nextOffset = readNextOffset(); |
||||
return nextOffset > baseOffset && !canConvertToRelativeOffset(nextOffset - 1); |
||||
} |
||||
|
||||
public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBoundOffset) { |
||||
return txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
// We don't call `largestRecordTimestamp` below to avoid materializing the time index when `toString` is invoked
|
||||
return "LogSegment(baseOffset=" + baseOffset + |
||||
", size=" + size() + |
||||
", lastModifiedTime=" + lastModified() + |
||||
", largestRecordTimestamp=" + maxTimestampAndOffsetSoFar.timestamp + |
||||
")"; |
||||
} |
||||
|
||||
/** |
||||
* Truncate off all index and log entries with offsets >= the given offset. |
||||
* If the given offset is larger than the largest message in this segment, do nothing. |
||||
* |
||||
* This method is not thread-safe. |
||||
* |
||||
* @param offset The offset to truncate to |
||||
* @return The number of log bytes truncated |
||||
*/ |
||||
public int truncateTo(long offset) throws IOException { |
||||
// Do offset translation before truncating the index to avoid needless scanning
|
||||
// in case we truncate the full index
|
||||
LogOffsetPosition mapping = translateOffset(offset); |
||||
OffsetIndex offsetIndex = offsetIndex(); |
||||
TimeIndex timeIndex = timeIndex(); |
||||
|
||||
offsetIndex.truncateTo(offset); |
||||
timeIndex.truncateTo(offset); |
||||
txnIndex.truncateTo(offset); |
||||
|
||||
// After truncation, reset and allocate more space for the (new currently active) index
|
||||
offsetIndex.resize(offsetIndex.maxIndexSize()); |
||||
timeIndex.resize(timeIndex.maxIndexSize()); |
||||
|
||||
int bytesTruncated; |
||||
if (mapping == null) |
||||
bytesTruncated = 0; |
||||
else |
||||
bytesTruncated = log.truncateTo(mapping.position); |
||||
|
||||
if (log.sizeInBytes() == 0) { |
||||
created = time.milliseconds(); |
||||
rollingBasedTimestamp = OptionalLong.empty(); |
||||
} |
||||
|
||||
bytesSinceLastIndexEntry = 0; |
||||
if (maxTimestampSoFar() >= 0) |
||||
maxTimestampAndOffsetSoFar = readLargestTimestamp(); |
||||
|
||||
return bytesTruncated; |
||||
} |
||||
|
||||
private TimestampOffset readLargestTimestamp() throws IOException { |
||||
// Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
|
||||
TimestampOffset lastTimeIndexEntry = timeIndex().lastEntry(); |
||||
OffsetPosition offsetPosition = offsetIndex().lookup(lastTimeIndexEntry.offset); |
||||
|
||||
// Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
|
||||
FileRecords.TimestampAndOffset maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position); |
||||
if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) |
||||
return new TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, maxTimestampOffsetAfterLastEntry.offset); |
||||
|
||||
return lastTimeIndexEntry; |
||||
} |
||||
|
||||
/** |
||||
* Calculate the offset that would be used for the next message to be append to this segment. |
||||
* Note that this is expensive. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public long readNextOffset() throws IOException { |
||||
FetchDataInfo fetchData = read(offsetIndex().lastOffset(), log.sizeInBytes()); |
||||
if (fetchData == null) |
||||
return baseOffset; |
||||
else |
||||
return fetchData.records.lastBatch() |
||||
.map(batch -> batch.nextOffset()) |
||||
.orElse(baseOffset); |
||||
} |
||||
|
||||
/** |
||||
* Flush this log segment to disk. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public void flush() throws IOException { |
||||
try { |
||||
LOG_FLUSH_TIMER.time(new Callable<Void>() { |
||||
// lambdas cannot declare a more specific exception type, so we use an anonymous inner class
|
||||
@Override |
||||
public Void call() throws IOException { |
||||
log.flush(); |
||||
offsetIndex().flush(); |
||||
timeIndex().flush(); |
||||
txnIndex.flush(); |
||||
return null; |
||||
} |
||||
}); |
||||
} catch (Exception e) { |
||||
if (e instanceof IOException) |
||||
throw (IOException) e; |
||||
else if (e instanceof RuntimeException) |
||||
throw (RuntimeException) e; |
||||
else |
||||
throw new IllegalStateException("Unexpected exception thrown: " + e, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Update the directory reference for the log and indices in this segment. This would typically be called after a |
||||
* directory is renamed. |
||||
*/ |
||||
void updateParentDir(File dir) { |
||||
log.updateParentDir(dir); |
||||
lazyOffsetIndex.updateParentDir(dir); |
||||
lazyTimeIndex.updateParentDir(dir); |
||||
txnIndex.updateParentDir(dir); |
||||
} |
||||
|
||||
/** |
||||
* Change the suffix for the index and log files for this log segment |
||||
* IOException from this method should be handled by the caller |
||||
*/ |
||||
public void changeFileSuffixes(String oldSuffix, String newSuffix) throws IOException { |
||||
log.renameTo(new File(Utils.replaceSuffix(log.file().getPath(), oldSuffix, newSuffix))); |
||||
lazyOffsetIndex.renameTo(new File(Utils.replaceSuffix(offsetIndexFile().getPath(), oldSuffix, newSuffix))); |
||||
lazyTimeIndex.renameTo(new File(Utils.replaceSuffix(timeIndexFile().getPath(), oldSuffix, newSuffix))); |
||||
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file().getPath(), oldSuffix, newSuffix))); |
||||
} |
||||
|
||||
public boolean hasSuffix(String suffix) { |
||||
return log.file().getName().endsWith(suffix) && |
||||
offsetIndexFile().getName().endsWith(suffix) && |
||||
timeIndexFile().getName().endsWith(suffix) && |
||||
txnIndex.file().getName().endsWith(suffix); |
||||
} |
||||
|
||||
/** |
||||
* Append the largest time index entry to the time index and trim the log and indexes. |
||||
* |
||||
* The time index entry appended will be used to decide when to delete the segment. |
||||
*/ |
||||
public void onBecomeInactiveSegment() throws IOException { |
||||
timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true); |
||||
offsetIndex().trimToValidSize(); |
||||
timeIndex().trimToValidSize(); |
||||
log.trim(); |
||||
} |
||||
|
||||
/** |
||||
* If not previously loaded, |
||||
* load the timestamp of the first message into memory. |
||||
*/ |
||||
private void loadFirstBatchTimestamp() { |
||||
if (!rollingBasedTimestamp.isPresent()) { |
||||
Iterator<FileChannelRecordBatch> iter = log.batches().iterator(); |
||||
if (iter.hasNext()) |
||||
rollingBasedTimestamp = OptionalLong.of(iter.next().maxTimestamp()); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The time this segment has waited to be rolled. |
||||
* If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment |
||||
* is rolled if the difference between the new batch's timestamp and the first batch's timestamp exceeds the |
||||
* segment rolling time. |
||||
* If the first batch does not have a timestamp, we use the wall clock time to determine when to roll a segment. A |
||||
* segment is rolled if the difference between the current wall clock time and the segment create time exceeds the |
||||
* segment rolling time. |
||||
*/ |
||||
public long timeWaitedForRoll(long now, long messageTimestamp) { |
||||
// Load the timestamp of the first message into memory
|
||||
loadFirstBatchTimestamp(); |
||||
long ts = rollingBasedTimestamp.orElse(-1L); |
||||
if (ts >= 0) |
||||
return messageTimestamp - ts; |
||||
return now - created; |
||||
} |
||||
|
||||
/** |
||||
* @return the first batch timestamp if the timestamp is available. Otherwise return Long.MaxValue |
||||
*/ |
||||
long getFirstBatchTimestamp() { |
||||
loadFirstBatchTimestamp(); |
||||
OptionalLong timestamp = rollingBasedTimestamp; |
||||
if (timestamp.isPresent() && timestamp.getAsLong() >= 0) |
||||
return timestamp.getAsLong(); |
||||
return Long.MAX_VALUE; |
||||
} |
||||
|
||||
/** |
||||
* Search the message offset based on timestamp and offset. |
||||
* |
||||
* This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules: |
||||
* |
||||
* - If all the messages in the segment have smaller offsets, return None |
||||
* - If all the messages in the segment have smaller timestamps, return None |
||||
* - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp |
||||
* the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp. |
||||
* - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp |
||||
* is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset. |
||||
* |
||||
* This method only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not |
||||
* see any message when scanning the log from the indexed position. The latter could happen if the log is truncated |
||||
* after we get the indexed position but before we scan the log from there. In this case we simply return None and the |
||||
* caller will need to check on the truncated log and maybe retry or even do the search on another log segment. |
||||
* |
||||
* @param timestampMs The timestamp to search for. |
||||
* @param startingOffset The starting offset to search. |
||||
* @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message. |
||||
*/ |
||||
public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(long timestampMs, long startingOffset) throws IOException { |
||||
// Get the index entry with a timestamp less than or equal to the target timestamp
|
||||
TimestampOffset timestampOffset = timeIndex().lookup(timestampMs); |
||||
int position = offsetIndex().lookup(Math.max(timestampOffset.offset, startingOffset)).position; |
||||
|
||||
// Search the timestamp
|
||||
return Optional.ofNullable(log.searchForTimestamp(timestampMs, position, startingOffset)); |
||||
} |
||||
|
||||
/** |
||||
* Close this log segment |
||||
*/ |
||||
@Override |
||||
public void close() throws IOException { |
||||
if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) |
||||
Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true)); |
||||
Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); |
||||
Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER); |
||||
Utils.closeQuietly(log, "log", LOGGER); |
||||
Utils.closeQuietly(txnIndex, "txnIndex", LOGGER); |
||||
} |
||||
|
||||
/** |
||||
* Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed |
||||
*/ |
||||
void closeHandlers() { |
||||
Utils.swallow(LOGGER, Level.WARN, "offsetIndex", () -> lazyOffsetIndex.closeHandler()); |
||||
Utils.swallow(LOGGER, Level.WARN, "timeIndex", () -> lazyTimeIndex.closeHandler()); |
||||
Utils.swallow(LOGGER, Level.WARN, "log", () -> log.closeHandlers()); |
||||
Utils.closeQuietly(txnIndex, "txnIndex", LOGGER); |
||||
} |
||||
|
||||
/** |
||||
* Delete this log segment from the filesystem. |
||||
*/ |
||||
public void deleteIfExists() throws IOException { |
||||
try { |
||||
Utils.tryAll(asList( |
||||
() -> deleteTypeIfExists(() -> log.deleteIfExists(), "log", log.file(), true), |
||||
() -> deleteTypeIfExists(() -> lazyOffsetIndex.deleteIfExists(), "offset index", offsetIndexFile(), true), |
||||
() -> deleteTypeIfExists(() -> lazyTimeIndex.deleteIfExists(), "time index", timeIndexFile(), true), |
||||
() -> deleteTypeIfExists(() -> txnIndex.deleteIfExists(), "transaction index", txnIndex.file(), false))); |
||||
} catch (Throwable t) { |
||||
if (t instanceof IOException) |
||||
throw (IOException) t; |
||||
if (t instanceof Error) |
||||
throw (Error) t; |
||||
if (t instanceof RuntimeException) |
||||
throw (RuntimeException) t; |
||||
throw new IllegalStateException("Unexpected exception: " + t.getMessage(), t); |
||||
} |
||||
} |
||||
|
||||
// Helper method for `deleteIfExists()`
|
||||
private Void deleteTypeIfExists(StorageAction<Boolean, IOException> delete, String fileType, File file, boolean logIfMissing) throws IOException { |
||||
try { |
||||
if (delete.execute()) |
||||
LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); |
||||
else if (logIfMissing) |
||||
LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); |
||||
return null; |
||||
} catch (IOException e) { |
||||
throw new IOException("Delete of " + fileType + " " + file.getAbsolutePath() + " failed.", e); |
||||
} |
||||
} |
||||
|
||||
// Visible for testing
|
||||
public boolean deleted() { |
||||
return !log.file().exists() && !offsetIndexFile().exists() && !timeIndexFile().exists() && !txnIndex.file().exists(); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* The last modified time of this log segment as a unix time stamp |
||||
*/ |
||||
public long lastModified() { |
||||
return log.file().lastModified(); |
||||
} |
||||
|
||||
/** |
||||
* The largest timestamp this segment contains, if maxTimestampSoFar >= 0, otherwise None. |
||||
*/ |
||||
public OptionalLong largestRecordTimestamp() throws IOException { |
||||
long maxTimestampSoFar = maxTimestampSoFar(); |
||||
if (maxTimestampSoFar >= 0) |
||||
return OptionalLong.of(maxTimestampSoFar); |
||||
return OptionalLong.empty(); |
||||
} |
||||
|
||||
/** |
||||
* The largest timestamp this segment contains. |
||||
*/ |
||||
public long largestTimestamp() throws IOException { |
||||
long maxTimestampSoFar = maxTimestampSoFar(); |
||||
if (maxTimestampSoFar >= 0) |
||||
return maxTimestampSoFar; |
||||
return lastModified(); |
||||
} |
||||
|
||||
/** |
||||
* Change the last modified time for this log segment |
||||
*/ |
||||
public void setLastModified(long ms) throws IOException { |
||||
FileTime fileTime = FileTime.fromMillis(ms); |
||||
Files.setLastModifiedTime(log.file().toPath(), fileTime); |
||||
Files.setLastModifiedTime(offsetIndexFile().toPath(), fileTime); |
||||
Files.setLastModifiedTime(timeIndexFile().toPath(), fileTime); |
||||
} |
||||
|
||||
public static LogSegment open(File dir, long baseOffset, LogConfig config, Time time, int initFileSize, boolean preallocate) throws IOException { |
||||
return open(dir, baseOffset, config, time, false, initFileSize, preallocate, ""); |
||||
} |
||||
|
||||
public static LogSegment open(File dir, long baseOffset, LogConfig config, Time time, boolean fileAlreadyExists, |
||||
int initFileSize, boolean preallocate, String fileSuffix) throws IOException { |
||||
int maxIndexSize = config.maxIndexSize; |
||||
return new LogSegment( |
||||
FileRecords.open(LogFileUtils.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), |
||||
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize), |
||||
LazyIndex.forTime(LogFileUtils.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize), |
||||
new TransactionIndex(baseOffset, LogFileUtils.transactionIndexFile(dir, baseOffset, fileSuffix)), |
||||
baseOffset, |
||||
config.indexInterval, |
||||
config.randomSegmentJitter(), |
||||
time); |
||||
} |
||||
|
||||
public static void deleteIfExists(File dir, long baseOffset, String fileSuffix) throws IOException { |
||||
deleteFileIfExists(LogFileUtils.offsetIndexFile(dir, baseOffset, fileSuffix)); |
||||
deleteFileIfExists(LogFileUtils.timeIndexFile(dir, baseOffset, fileSuffix)); |
||||
deleteFileIfExists(LogFileUtils.transactionIndexFile(dir, baseOffset, fileSuffix)); |
||||
deleteFileIfExists(LogFileUtils.logFile(dir, baseOffset, fileSuffix)); |
||||
} |
||||
|
||||
private static boolean deleteFileIfExists(File file) throws IOException { |
||||
return Files.deleteIfExists(file.toPath()); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,355 @@
@@ -0,0 +1,355 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.storage.internals.log; |
||||
|
||||
import org.apache.kafka.common.TopicPartition; |
||||
|
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
import java.util.concurrent.ConcurrentNavigableMap; |
||||
import java.util.concurrent.ConcurrentSkipListMap; |
||||
import java.util.function.Predicate; |
||||
import java.util.stream.Collectors; |
||||
|
||||
/** |
||||
* This class encapsulates a thread-safe navigable map of LogSegment instances and provides the |
||||
* required read and write behavior on the map. |
||||
*/ |
||||
public class LogSegments { |
||||
|
||||
private final TopicPartition topicPartition; |
||||
/* the segments of the log with key being LogSegment base offset and value being a LogSegment */ |
||||
private final ConcurrentNavigableMap<Long, LogSegment> segments = new ConcurrentSkipListMap<>(); |
||||
|
||||
/** |
||||
* Create new instance. |
||||
* |
||||
* @param topicPartition the TopicPartition associated with the segments |
||||
* (useful for logging purposes) |
||||
*/ |
||||
public LogSegments(TopicPartition topicPartition) { |
||||
this.topicPartition = topicPartition; |
||||
} |
||||
|
||||
/** |
||||
* Return true if the segments are empty, false otherwise. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public boolean isEmpty() { |
||||
return segments.isEmpty(); |
||||
} |
||||
|
||||
/** |
||||
* Return true if the segments are non-empty, false otherwise. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public boolean nonEmpty() { |
||||
return !isEmpty(); |
||||
} |
||||
|
||||
/** |
||||
* Add the given segment, or replace an existing entry. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @param segment the segment to add |
||||
*/ |
||||
public LogSegment add(LogSegment segment) { |
||||
return this.segments.put(segment.baseOffset(), segment); |
||||
} |
||||
|
||||
/** |
||||
* Remove the segment at the provided offset. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @param offset the offset to be removed |
||||
*/ |
||||
public void remove(long offset) { |
||||
segments.remove(offset); |
||||
} |
||||
|
||||
/** |
||||
* Clears all entries. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public void clear() { |
||||
segments.clear(); |
||||
} |
||||
|
||||
/** |
||||
* Close all segments. |
||||
*/ |
||||
public void close() throws IOException { |
||||
for (LogSegment s : values()) |
||||
s.close(); |
||||
} |
||||
|
||||
/** |
||||
* Close the handlers for all segments. |
||||
*/ |
||||
public void closeHandlers() { |
||||
for (LogSegment s : values()) |
||||
s.closeHandlers(); |
||||
} |
||||
|
||||
/** |
||||
* Update the directory reference for the log and indices of all segments. |
||||
* |
||||
* @param dir the renamed directory |
||||
*/ |
||||
public void updateParentDir(File dir) { |
||||
for (LogSegment s : values()) |
||||
s.updateParentDir(dir); |
||||
} |
||||
|
||||
/** |
||||
* Take care! this is an O(n) operation, where n is the number of segments. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @return The number of segments. |
||||
* |
||||
*/ |
||||
public int numberOfSegments() { |
||||
return segments.size(); |
||||
} |
||||
|
||||
/** |
||||
* @return the base offsets of all segments |
||||
*/ |
||||
public Collection<Long> baseOffsets() { |
||||
return values().stream().map(s -> s.baseOffset()).collect(Collectors.toList()); |
||||
} |
||||
|
||||
/** |
||||
* Return true if a segment exists at the provided offset, false otherwise. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @param offset the segment to be checked |
||||
*/ |
||||
public boolean contains(long offset) { |
||||
return segments.containsKey(offset); |
||||
} |
||||
|
||||
/** |
||||
* Retrieves a segment at the specified offset. |
||||
* |
||||
* This method is thread-safe. |
||||
* |
||||
* @param offset the segment to be retrieved |
||||
* |
||||
* @return the segment if it exists, otherwise None. |
||||
*/ |
||||
public Optional<LogSegment> get(long offset) { |
||||
return Optional.ofNullable(segments.get(offset)); |
||||
} |
||||
|
||||
/** |
||||
* @return an iterator to the log segments ordered from oldest to newest. |
||||
*/ |
||||
public Collection<LogSegment> values() { |
||||
return segments.values(); |
||||
} |
||||
|
||||
/** |
||||
* @return An iterator to all segments beginning with the segment that includes "from" and ending |
||||
* with the segment that includes up to "to-1" or the end of the log (if to > end of log). |
||||
*/ |
||||
public Collection<LogSegment> values(long from, long to) { |
||||
if (from == to) { |
||||
// Handle non-segment-aligned empty sets
|
||||
return Collections.emptyList(); |
||||
} else if (to < from) { |
||||
throw new IllegalArgumentException("Invalid log segment range: requested segments in " + topicPartition + |
||||
" from offset " + from + " which is greater than limit offset " + to); |
||||
} else { |
||||
Long floor = segments.floorKey(from); |
||||
if (floor != null) |
||||
return segments.subMap(floor, to).values(); |
||||
return segments.headMap(to).values(); |
||||
} |
||||
} |
||||
|
||||
public Collection<LogSegment> nonActiveLogSegmentsFrom(long from) { |
||||
LogSegment activeSegment = lastSegment().get(); |
||||
if (from > activeSegment.baseOffset()) |
||||
return Collections.emptyList(); |
||||
else |
||||
return values(from, activeSegment.baseOffset()); |
||||
} |
||||
|
||||
/** |
||||
* Return the entry associated with the greatest offset less than or equal to the given offset, |
||||
* if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
private Optional<Map.Entry<Long, LogSegment>> floorEntry(long offset) { |
||||
return Optional.ofNullable(segments.floorEntry(offset)); |
||||
} |
||||
|
||||
/** |
||||
* Return the log segment with the greatest offset less than or equal to the given offset, |
||||
* if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<LogSegment> floorSegment(long offset) { |
||||
return floorEntry(offset).map(e -> e.getValue()); |
||||
} |
||||
|
||||
/** |
||||
* Return the entry associated with the greatest offset strictly less than the given offset, |
||||
* if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
private Optional<Map.Entry<Long, LogSegment>> lowerEntry(long offset) { |
||||
return Optional.ofNullable(segments.lowerEntry(offset)); |
||||
} |
||||
|
||||
/** |
||||
* Return the log segment with the greatest offset strictly less than the given offset, |
||||
* if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<LogSegment> lowerSegment(long offset) { |
||||
return lowerEntry(offset).map(e -> e.getValue()); |
||||
} |
||||
|
||||
/** |
||||
* Return the entry associated with the smallest offset strictly greater than the given offset, |
||||
* if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<Map.Entry<Long, LogSegment>> higherEntry(long offset) { |
||||
return Optional.ofNullable(segments.higherEntry(offset)); |
||||
} |
||||
|
||||
/** |
||||
* Return the log segment with the smallest offset strictly greater than the given offset, |
||||
* if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<LogSegment> higherSegment(long offset) { |
||||
return higherEntry(offset).map(e -> e.getValue()); |
||||
} |
||||
|
||||
/** |
||||
* Return the entry associated with the smallest offset, if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<Map.Entry<Long, LogSegment>> firstEntry() { |
||||
return Optional.ofNullable(segments.firstEntry()); |
||||
} |
||||
|
||||
/** |
||||
* Return the log segment associated with the smallest offset, if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<LogSegment> firstSegment() { |
||||
return firstEntry().map(s -> s.getValue()); |
||||
} |
||||
|
||||
/** |
||||
* @return the base offset of the log segment associated with the smallest offset, if it exists |
||||
*/ |
||||
public OptionalLong firstSegmentBaseOffset() { |
||||
Optional<LogSegment> first = firstSegment(); |
||||
if (first.isPresent()) |
||||
return OptionalLong.of(first.get().baseOffset()); |
||||
return OptionalLong.empty(); |
||||
} |
||||
|
||||
/** |
||||
* Return the entry associated with the greatest offset, if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<Map.Entry<Long, LogSegment>> lastEntry() { |
||||
return Optional.ofNullable(segments.lastEntry()); |
||||
} |
||||
|
||||
/** |
||||
* Return the log segment with the greatest offset, if it exists. |
||||
* |
||||
* This method is thread-safe. |
||||
*/ |
||||
public Optional<LogSegment> lastSegment() { |
||||
return lastEntry().map(e -> e.getValue()); |
||||
} |
||||
|
||||
/** |
||||
* @return an iterable with log segments ordered from lowest base offset to highest, |
||||
* each segment returned has a base offset strictly greater than the provided baseOffset. |
||||
*/ |
||||
public Collection<LogSegment> higherSegments(long baseOffset) { |
||||
Long higherOffset = segments.higherKey(baseOffset); |
||||
if (higherOffset != null) |
||||
return segments.tailMap(higherOffset, true).values(); |
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
/** |
||||
* The active segment that is currently taking appends |
||||
*/ |
||||
public LogSegment activeSegment() { |
||||
return lastSegment().get(); |
||||
} |
||||
|
||||
public long sizeInBytes() { |
||||
return LogSegments.sizeInBytes(values()); |
||||
} |
||||
|
||||
/** |
||||
* Returns an Iterable containing segments matching the provided predicate. |
||||
* |
||||
* @param predicate the predicate to be used for filtering segments. |
||||
*/ |
||||
public Collection<LogSegment> filter(Predicate<LogSegment> predicate) { |
||||
return values().stream().filter(predicate).collect(Collectors.toList()); |
||||
} |
||||
|
||||
/** |
||||
* Calculate a log's size (in bytes) from the provided log segments. |
||||
* |
||||
* @param segments The log segments to calculate the size of |
||||
* @return Sum of the log segments' sizes (in bytes) |
||||
*/ |
||||
public static long sizeInBytes(Collection<LogSegment> segments) { |
||||
return segments.stream().mapToLong(s -> s.size()).sum(); |
||||
} |
||||
|
||||
public static Collection<Long> getFirstBatchTimestampForSegments(Collection<LogSegment> segments) { |
||||
return segments.stream().map(s -> s.getFirstBatchTimestamp()).collect(Collectors.toList()); |
||||
} |
||||
} |
Loading…
Reference in new issue