Browse Source

MINOR: Ensure in-memory metadata is removed before physical deletion of segment (#7106)

Minor refactoring of the places where we delete log segments, to ensure we always remove the in-memory metadata of the segment before performing physical deletion.

Reviewers: Ismael Juma <ismael@juma.me.uk>, NIkhil Bhatia <rite2nikhil@gmail.com>, Jun Rao <junrao@gmail.com>
pull/7096/merge
Dhruvil Shah 5 years ago committed by Jun Rao
parent
commit
e9c61f690e
  1. 72
      core/src/main/scala/kafka/log/Log.scala
  2. 21
      core/src/test/scala/unit/kafka/log/LogTest.scala

72
core/src/main/scala/kafka/log/Log.scala

@ -746,8 +746,10 @@ class Log(@volatile var dir: File, @@ -746,8 +746,10 @@ class Log(@volatile var dir: File,
// if we have the clean shutdown marker, skip recovery
if (!hasCleanShutdownFile) {
// okay we need to actually recover this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while (unflushed.hasNext) {
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
var truncated = false
while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
@ -763,7 +765,8 @@ class Log(@volatile var dir: File, @@ -763,7 +765,8 @@ class Log(@volatile var dir: File,
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
unflushed.foreach(deleteSegment)
removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
truncated = true
}
}
}
@ -773,7 +776,7 @@ class Log(@volatile var dir: File, @@ -773,7 +776,7 @@ class Log(@volatile var dir: File,
if (logEndOffset < logStartOffset) {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
"This could happen if segment files were deleted from the file system.")
logSegments.foreach(deleteSegment)
removeAndDeleteSegments(logSegments, asyncDelete = true)
}
}
@ -1653,7 +1656,7 @@ class Log(@volatile var dir: File, @@ -1653,7 +1656,7 @@ class Log(@volatile var dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
deletable.foreach(deleteSegment)
removeAndDeleteSegments(deletable, asyncDelete = true)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
}
}
@ -1829,7 +1832,7 @@ class Log(@volatile var dir: File, @@ -1829,7 +1832,7 @@ class Log(@volatile var dir: File,
s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
s" size of offset index: ${activeSegment.offsetIndex.entries}.")
deleteSegment(activeSegment)
removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true)
} else {
throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
@ -1959,8 +1962,7 @@ class Log(@volatile var dir: File, @@ -1959,8 +1962,7 @@ class Log(@volatile var dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
removeLogMetrics()
logSegments.foreach(_.deleteIfExists())
segments.clear()
removeAndDeleteSegments(logSegments, asyncDelete = false)
leaderEpochCache.foreach(_.clear())
Utils.delete(dir)
// File handlers will be closed if this log is deleted
@ -2011,7 +2013,7 @@ class Log(@volatile var dir: File, @@ -2011,7 +2013,7 @@ class Log(@volatile var dir: File,
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
deletable.foreach(deleteSegment)
removeAndDeleteSegments(deletable, asyncDelete = true)
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
@ -2035,8 +2037,7 @@ class Log(@volatile var dir: File, @@ -2035,8 +2037,7 @@ class Log(@volatile var dir: File,
debug(s"Truncate and start at offset $newOffset")
lock synchronized {
checkIfMemoryMappedBufferClosed()
val segmentsToDelete = logSegments.toList
segmentsToDelete.foreach(deleteSegment)
removeAndDeleteSegments(logSegments, asyncDelete = true)
addSegment(LogSegment.open(dir,
baseOffset = newOffset,
config = config,
@ -2099,30 +2100,36 @@ class Log(@volatile var dir: File, @@ -2099,30 +2100,36 @@ class Log(@volatile var dir: File,
}
/**
* This method performs an asynchronous log segment delete by doing the following:
* This method deletes the given log segments by doing the following for each of them:
* <ol>
* <li>It removes the segment from the segment map so that it will no longer be used for reads.
* <li>It renames the index and log files by appending .deleted to the respective file name
* <li>It schedules an asynchronous delete operation to occur in the future
* <li>It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
* </ol>
* This allows reads to happen concurrently without synchronization and without the possibility of physically
* deleting a file while it is being read from.
* Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
* physically deleting a file while it is being read.
*
* This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded
* or the immediate caller will catch and handle IOException
*
* @param segment The log segment to schedule for deletion
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted asynchronously
*/
private def deleteSegment(segment: LogSegment) {
info(s"Scheduling log segment [baseOffset ${segment.baseOffset}, size ${segment.size}] for deletion.")
private def removeAndDeleteSegments(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
lock synchronized {
segments.remove(segment.baseOffset)
asyncDeleteSegment(segment)
// As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
// removing the deleted segment, we should force materialization of the iterator here, so that results of the
// iteration remain valid and deterministic.
val toDelete = segments.toList
toDelete.foreach { segment =>
this.segments.remove(segment.baseOffset)
}
deleteSegmentFiles(toDelete, asyncDelete)
}
}
/**
* Perform an asynchronous delete on the given file.
* Perform physical deletion for the given file. Allows the file to be deleted asynchronously or synchronously.
*
* This method assumes that the file exists and the method is not thread-safe.
*
@ -2131,15 +2138,22 @@ class Log(@volatile var dir: File, @@ -2131,15 +2138,22 @@ class Log(@volatile var dir: File,
*
* @throws IOException if the file can't be renamed and still exists
*/
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
info(s"Deleting segment ${segment.baseOffset}")
private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean) {
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
def deleteSegments() {
info(s"Deleting segments $segments")
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
segment.deleteIfExists()
segments.foreach(_.deleteIfExists())
}
}
scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
if (asyncDelete) {
info(s"Scheduling segments for deletion $segments")
scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
} else {
deleteSegments()
}
}
/**
@ -2194,8 +2208,8 @@ class Log(@volatile var dir: File, @@ -2194,8 +2208,8 @@ class Log(@volatile var dir: File,
// remove the index entry
if (seg.baseOffset != sortedNewSegments.head.baseOffset)
segments.remove(seg.baseOffset)
// delete segment
asyncDeleteSegment(seg)
// delete segment files
deleteSegmentFiles(List(seg), asyncDelete = true)
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))

21
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -378,6 +378,27 @@ class LogTest { @@ -378,6 +378,27 @@ class LogTest {
assertEquals(startOffset, log.logEndOffset)
}
@Test
def testLogDelete(): Unit = {
val logConfig = LogTest.createLogConfig()
val log = createLog(logDir, logConfig)
for (i <- 0 to 100) {
val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
log.roll()
}
assertTrue(log.logSegments.size > 0)
assertFalse(logDir.listFiles.isEmpty)
// delete the log
log.delete()
assertEquals(0, log.logSegments.size)
assertFalse(logDir.exists)
}
private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)

Loading…
Cancel
Save