diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b13823932d3..5e12e315bbf 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -746,8 +746,10 @@ class Log(@volatile var dir: File,
// if we have the clean shutdown marker, skip recovery
if (!hasCleanShutdownFile) {
// okay we need to actually recover this log
- val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
- while (unflushed.hasNext) {
+ val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
+ var truncated = false
+
+ while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
@@ -763,7 +765,8 @@ class Log(@volatile var dir: File,
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
- unflushed.foreach(deleteSegment)
+ removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
+ truncated = true
}
}
}
@@ -773,7 +776,7 @@ class Log(@volatile var dir: File,
if (logEndOffset < logStartOffset) {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
"This could happen if segment files were deleted from the file system.")
- logSegments.foreach(deleteSegment)
+ removeAndDeleteSegments(logSegments, asyncDelete = true)
}
}
@@ -1653,7 +1656,7 @@ class Log(@volatile var dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
- deletable.foreach(deleteSegment)
+ removeAndDeleteSegments(deletable, asyncDelete = true)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
}
}
@@ -1829,7 +1832,7 @@ class Log(@volatile var dir: File,
s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
s" size of offset index: ${activeSegment.offsetIndex.entries}.")
- deleteSegment(activeSegment)
+ removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true)
} else {
throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
@@ -1959,8 +1962,7 @@ class Log(@volatile var dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
removeLogMetrics()
- logSegments.foreach(_.deleteIfExists())
- segments.clear()
+ removeAndDeleteSegments(logSegments, asyncDelete = false)
leaderEpochCache.foreach(_.clear())
Utils.delete(dir)
// File handlers will be closed if this log is deleted
@@ -2011,7 +2013,7 @@ class Log(@volatile var dir: File,
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
- deletable.foreach(deleteSegment)
+ removeAndDeleteSegments(deletable, asyncDelete = true)
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
@@ -2035,8 +2037,7 @@ class Log(@volatile var dir: File,
debug(s"Truncate and start at offset $newOffset")
lock synchronized {
checkIfMemoryMappedBufferClosed()
- val segmentsToDelete = logSegments.toList
- segmentsToDelete.foreach(deleteSegment)
+ removeAndDeleteSegments(logSegments, asyncDelete = true)
addSegment(LogSegment.open(dir,
baseOffset = newOffset,
config = config,
@@ -2099,30 +2100,36 @@ class Log(@volatile var dir: File,
}
/**
- * This method performs an asynchronous log segment delete by doing the following:
+ * This method deletes the given log segments by doing the following for each of them:
*
* - It removes the segment from the segment map so that it will no longer be used for reads.
*
- It renames the index and log files by appending .deleted to the respective file name
- *
- It schedules an asynchronous delete operation to occur in the future
+ *
- It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
*
- * This allows reads to happen concurrently without synchronization and without the possibility of physically
- * deleting a file while it is being read from.
+ * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
+ * physically deleting a file while it is being read.
*
* This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded
* or the immediate caller will catch and handle IOException
*
- * @param segment The log segment to schedule for deletion
+ * @param segments The log segments to schedule for deletion
+ * @param asyncDelete Whether the segment files should be deleted asynchronously
*/
- private def deleteSegment(segment: LogSegment) {
- info(s"Scheduling log segment [baseOffset ${segment.baseOffset}, size ${segment.size}] for deletion.")
+ private def removeAndDeleteSegments(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
lock synchronized {
- segments.remove(segment.baseOffset)
- asyncDeleteSegment(segment)
+ // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
+ // removing the deleted segment, we should force materialization of the iterator here, so that results of the
+ // iteration remain valid and deterministic.
+ val toDelete = segments.toList
+ toDelete.foreach { segment =>
+ this.segments.remove(segment.baseOffset)
+ }
+ deleteSegmentFiles(toDelete, asyncDelete)
}
}
/**
- * Perform an asynchronous delete on the given file.
+ * Perform physical deletion for the given file. Allows the file to be deleted asynchronously or synchronously.
*
* This method assumes that the file exists and the method is not thread-safe.
*
@@ -2131,15 +2138,22 @@ class Log(@volatile var dir: File,
*
* @throws IOException if the file can't be renamed and still exists
*/
- private def asyncDeleteSegment(segment: LogSegment) {
- segment.changeFileSuffixes("", Log.DeletedFileSuffix)
- def deleteSeg() {
- info(s"Deleting segment ${segment.baseOffset}")
+ private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean) {
+ segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
+
+ def deleteSegments() {
+ info(s"Deleting segments $segments")
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
- segment.deleteIfExists()
+ segments.foreach(_.deleteIfExists())
}
}
- scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
+
+ if (asyncDelete) {
+ info(s"Scheduling segments for deletion $segments")
+ scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
+ } else {
+ deleteSegments()
+ }
}
/**
@@ -2194,8 +2208,8 @@ class Log(@volatile var dir: File,
// remove the index entry
if (seg.baseOffset != sortedNewSegments.head.baseOffset)
segments.remove(seg.baseOffset)
- // delete segment
- asyncDeleteSegment(seg)
+ // delete segment files
+ deleteSegmentFiles(List(seg), asyncDelete = true)
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6bb9301a898..46494e788d1 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -378,6 +378,27 @@ class LogTest {
assertEquals(startOffset, log.logEndOffset)
}
+ @Test
+ def testLogDelete(): Unit = {
+ val logConfig = LogTest.createLogConfig()
+ val log = createLog(logDir, logConfig)
+
+ for (i <- 0 to 100) {
+ val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+ log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+ log.roll()
+ }
+
+ assertTrue(log.logSegments.size > 0)
+ assertFalse(logDir.listFiles.isEmpty)
+
+ // delete the log
+ log.delete()
+
+ assertEquals(0, log.logSegments.size)
+ assertFalse(logDir.exists)
+ }
+
private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)