Browse Source

KAFKA-14550: Move SnapshotFile and CorruptSnapshotException to storage module (#13039)

For broader context on this change, see:

* KAFKA-14470: Move log layer to storage module

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/13066/head
Satish Duggana 2 years ago committed by GitHub
parent
commit
026105d05f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  2. 9
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  3. 5
      core/src/main/scala/kafka/log/LocalLog.scala
  4. 8
      core/src/main/scala/kafka/log/LogLoader.scala
  5. 10
      core/src/main/scala/kafka/log/LogSegment.scala
  6. 60
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  7. 2
      core/src/main/scala/kafka/log/UnifiedLog.scala
  8. 4
      core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
  9. 2
      core/src/main/scala/kafka/tools/DumpLogSegments.scala
  10. 9
      core/src/main/scala/kafka/utils/CoreUtils.scala
  11. 12
      core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
  12. 12
      core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
  13. 8
      core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
  14. 26
      storage/src/main/java/org/apache/kafka/server/log/internals/CorruptSnapshotException.java
  15. 35
      storage/src/main/java/org/apache/kafka/server/log/internals/LogFileUtils.java
  16. 71
      storage/src/main/java/org/apache/kafka/server/log/internals/SnapshotFile.java

8
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -1488,4 +1488,12 @@ public final class Utils { @@ -1488,4 +1488,12 @@ public final class Utils {
return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).format(dateTimeFormatter);
}
/**
* Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception.
*/
public static String replaceSuffix(String str, String oldSuffix, String newSuffix) {
if (!str.endsWith(oldSuffix))
throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str);
return str.substring(0, str.length() - oldSuffix.length()) + newSuffix;
}
}

9
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -926,4 +926,13 @@ public class UtilsTest { @@ -926,4 +926,13 @@ public class UtilsTest {
assertEquals(String.format("2020-11-09 12:34:05,123 %s", requiredOffsetFormat), Utils.toLogDateTimeFormat(timestampWithMilliSeconds.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()));
assertEquals(String.format("2020-11-09 12:34:05,000 %s", requiredOffsetFormat), Utils.toLogDateTimeFormat(timestampWithSeconds.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()));
}
@Test
public void testReplaceSuffix() {
assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"));
assertEquals("blah.foo", Utils.replaceSuffix("blah.foo.txt", ".txt", ""));
assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""));
assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"));
}
}

5
core/src/main/scala/kafka/log/LocalLog.scala

@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
import scala.jdk.CollectionConverters._
@ -714,10 +715,6 @@ object LocalLog extends Logging { @@ -714,10 +715,6 @@ object LocalLog extends Logging {
private[log] def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File =
new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix + suffix)
private[log] def offsetFromFileName(filename: String): Long = {
filename.substring(0, filename.indexOf('.')).toLong
}
private[log] def offsetFromFile(file: File): Long = {
offsetFromFileName(file.getName)
}

8
core/src/main/scala/kafka/log/LogLoader.scala

@ -22,10 +22,10 @@ import java.nio.file.{Files, NoSuchFileException} @@ -22,10 +22,10 @@ import java.nio.file.{Files, NoSuchFileException}
import kafka.common.LogSegmentOffsetOverflowException
import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.{CoreUtils, Logging, Scheduler}
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.log.internals.{CorruptIndexException, LogDirFailureChannel, LogOffsetMetadata}
@ -109,7 +109,7 @@ class LogLoader( @@ -109,7 +109,7 @@ class LogLoader(
// We store segments that require renaming in this code block, and do the actual renaming later.
var minSwapFileOffset = Long.MaxValue
var maxSwapFileOffset = Long.MinValue
swapFiles.filter(f => UnifiedLog.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
swapFiles.filter(f => UnifiedLog.isLogFile(new File(Utils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
val baseOffset = offsetFromFile(f)
val segment = LogSegment.open(f.getParentFile,
baseOffset = baseOffset,
@ -144,7 +144,7 @@ class LogLoader( @@ -144,7 +144,7 @@ class LogLoader(
for (file <- dir.listFiles if file.isFile) {
if (file.getName.endsWith(SwapFileSuffix)) {
info(s"Recovering file ${file.getName} by renaming from ${UnifiedLog.SwapFileSuffix} files.")
file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.SwapFileSuffix, "")))
file.renameTo(new File(Utils.replaceSuffix(file.getPath, UnifiedLog.SwapFileSuffix, "")))
}
}

10
core/src/main/scala/kafka/log/LogSegment.scala

@ -31,7 +31,7 @@ import org.apache.kafka.common.InvalidRecordException @@ -31,7 +31,7 @@ import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import java.util.Optional
@ -492,10 +492,10 @@ class LogSegment private[log] (val log: FileRecords, @@ -492,10 +492,10 @@ class LogSegment private[log] (val log: FileRecords,
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(Utils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(Utils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
}
def hasSuffix(suffix: String): Boolean = {

60
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -16,27 +16,24 @@ @@ -16,27 +16,24 @@
*/
package kafka.log
import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
import java.util.concurrent.ConcurrentSkipListMap
import kafka.log.UnifiedLog.offsetFromFile
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils.{CoreUtils, Logging, nonthreadsafe, threadsafe}
import kafka.utils.{Logging, nonthreadsafe, threadsafe}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time}
import org.apache.kafka.server.log.internals._
import scala.jdk.CollectionConverters._
import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
import java.util.concurrent.ConcurrentSkipListMap
import scala.collection.mutable.ListBuffer
import scala.collection.{immutable, mutable}
class CorruptSnapshotException(msg: String) extends KafkaException(msg)
import scala.jdk.CollectionConverters._
/**
* The last written record for a given producer. The last data offset may be undefined
@ -462,7 +459,7 @@ object ProducerStateManager { @@ -462,7 +459,7 @@ object ProducerStateManager {
private[log] def listSnapshotFiles(dir: File): Seq[SnapshotFile] = {
if (dir.exists && dir.isDirectory) {
Option(dir.listFiles).map { files =>
files.filter(f => f.isFile && isSnapshotFile(f)).map(SnapshotFile(_)).toSeq
files.filter(f => f.isFile && isSnapshotFile(f)).map(new SnapshotFile(_)).toSeq
}.getOrElse(Seq.empty)
} else Seq.empty
}
@ -757,7 +754,7 @@ class ProducerStateManager( @@ -757,7 +754,7 @@ class ProducerStateManager(
def takeSnapshot(): Unit = {
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset))
val snapshotFile = new SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset))
val start = time.hiResClockMs()
writeSnapshot(snapshotFile.file, producers)
info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.")
@ -911,42 +908,9 @@ class ProducerStateManager( @@ -911,42 +908,9 @@ class ProducerStateManager(
}
}
case class SnapshotFile private[log] (@volatile private var _file: File,
offset: Long) extends Logging {
def deleteIfExists(): Boolean = {
val deleted = Files.deleteIfExists(file.toPath)
if (deleted) {
info(s"Deleted producer state snapshot ${file.getAbsolutePath}")
} else {
info(s"Failed to delete producer state snapshot ${file.getAbsolutePath} because it does not exist.")
}
deleted
}
def updateParentDir(parentDir: File): Unit = {
_file = new File(parentDir, _file.getName)
}
def file: File = {
_file
}
def renameTo(newSuffix: String): Unit = {
val renamed = new File(CoreUtils.replaceSuffix(_file.getPath, "", newSuffix))
try {
Utils.atomicMoveWithFallback(_file.toPath, renamed.toPath)
} finally {
_file = renamed
}
}
}
object SnapshotFile {
def apply(file: File): SnapshotFile = {
val offset = offsetFromFile(file)
SnapshotFile(file, offset)
}
}
class ProducerStateManagerConfig(@volatile var producerIdExpirationMs: Int) extends Logging with BrokerReconfigurable {

2
core/src/main/scala/kafka/log/UnifiedLog.scala

@ -1943,8 +1943,6 @@ object UnifiedLog extends Logging { @@ -1943,8 +1943,6 @@ object UnifiedLog extends Logging {
def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = LocalLog.transactionIndexFile(dir, offset, suffix)
def offsetFromFileName(filename: String): Long = LocalLog.offsetFromFileName(filename)
def offsetFromFile(file: File): Long = LocalLog.offsetFromFile(file)
def sizeInBytes(segments: Iterable[LogSegment]): Long = LogSegments.sizeInBytes(segments)

4
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala

@ -62,8 +62,8 @@ class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[Ti @@ -62,8 +62,8 @@ class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[Ti
if (!markedForCleanup) {
markedForCleanup = true
Array(offsetIndex, timeIndex).foreach(index =>
index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, "", UnifiedLog.DeletedFileSuffix))))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, "",
index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", UnifiedLog.DeletedFileSuffix))))
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, "",
UnifiedLog.DeletedFileSuffix)))
}
}

2
core/src/main/scala/kafka/tools/DumpLogSegments.scala

@ -32,7 +32,7 @@ import org.apache.kafka.common.record._ @@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.server.log.internals.{OffsetIndex, TimeIndex, TransactionIndex}
import org.apache.kafka.server.log.internals.{CorruptSnapshotException, OffsetIndex, TimeIndex, TransactionIndex}
import org.apache.kafka.snapshot.Snapshots
import scala.jdk.CollectionConverters._

9
core/src/main/scala/kafka/utils/CoreUtils.scala

@ -203,15 +203,6 @@ object CoreUtils { @@ -203,15 +203,6 @@ object CoreUtils {
def circularIterator[T](coll: Iterable[T]) =
for (_ <- Iterator.continually(1); t <- coll) yield t
/**
* Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception.
*/
def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = {
if(!s.endsWith(oldSuffix))
throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s))
s.substring(0, s.length - oldSuffix.length) + newSuffix
}
/**
* Execute the given function inside the lock
*/

12
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

@ -167,7 +167,7 @@ class LogCleanerTest { @@ -167,7 +167,7 @@ class LogCleanerTest {
// Remember reference to the first log and determine its file name expected for async deletion
val firstLogFile = log.logSegments.head.log
val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, "", UnifiedLog.DeletedFileSuffix)
val expectedFileName = Utils.replaceSuffix(firstLogFile.file.getPath, "", UnifiedLog.DeletedFileSuffix)
// Clean the log. This should trigger replaceSegments() and deleteOldSegments();
val offsetMap = new FakeOffsetMap(Int.MaxValue)
@ -1597,7 +1597,7 @@ class LogCleanerTest { @@ -1597,7 +1597,7 @@ class LogCleanerTest {
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, allKeys)
@ -1612,9 +1612,9 @@ class LogCleanerTest { @@ -1612,9 +1612,9 @@ class LogCleanerTest {
// 2) Simulate recovery just after .cleaned file is created, and a subset of them are renamed to .swap
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
log.logSegments.head.log.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.log.file.getPath, UnifiedLog.CleanedFileSuffix, UnifiedLog.SwapFileSuffix)))
log.logSegments.head.log.renameTo(new File(Utils.replaceSuffix(log.logSegments.head.log.file.getPath, UnifiedLog.CleanedFileSuffix, UnifiedLog.SwapFileSuffix)))
for (file <- dir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, allKeys)
@ -1630,7 +1630,7 @@ class LogCleanerTest { @@ -1630,7 +1630,7 @@ class LogCleanerTest {
// renamed to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, cleanedKeys)
@ -1667,7 +1667,7 @@ class LogCleanerTest { @@ -1667,7 +1667,7 @@ class LogCleanerTest {
// 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed
// to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", UnifiedLog.SwapFileSuffix)))
log.logSegments.head.timeIndex.file.renameTo(new File(Utils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", UnifiedLog.SwapFileSuffix)))
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again

12
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala

@ -24,14 +24,14 @@ import java.util.Properties @@ -24,14 +24,14 @@ import java.util.Properties
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig}
import kafka.server.metadata.MockConfigRepository
import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
import kafka.utils.{MockTime, Scheduler, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetIndex}
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetIndex, SnapshotFile}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -1219,7 +1219,7 @@ class LogLoaderTest { @@ -1219,7 +1219,7 @@ class LogLoaderTest {
segment.truncateTo(0)
})
for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
assertEquals(expectedKeys, LogTestUtils.keysInLog(recoveredLog))
@ -1247,7 +1247,7 @@ class LogLoaderTest { @@ -1247,7 +1247,7 @@ class LogLoaderTest {
segment.truncateTo(0)
}
for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
assertEquals(expectedKeys, LogTestUtils.keysInLog(recoveredLog))
@ -1271,7 +1271,7 @@ class LogLoaderTest { @@ -1271,7 +1271,7 @@ class LogLoaderTest {
segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
})
for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
// Truncate the old segment
segmentWithOverflow.truncateTo(0)
@ -1653,7 +1653,7 @@ class LogLoaderTest { @@ -1653,7 +1653,7 @@ class LogLoaderTest {
assertEquals(4, log.logEndOffset)
val offsetsWithSnapshotFiles = (1 until 5)
.map(offset => SnapshotFile(UnifiedLog.producerSnapshotFile(logDir, offset)))
.map(offset => new SnapshotFile(UnifiedLog.producerSnapshotFile(logDir, offset)))
.filter(snapshotFile => snapshotFile.file.exists())
.map(_.offset)
val inMemorySnapshotFiles = (1 until 5)

8
core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala

@ -131,14 +131,6 @@ class CoreUtilsTest extends Logging { @@ -131,14 +131,6 @@ class CoreUtilsTest extends Logging {
assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
}
@Test
def testReplaceSuffix(): Unit = {
assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ""))
assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", ""))
assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt"))
}
@Test
def testCsvList(): Unit = {
val emptyString:String = ""

26
storage/src/main/java/org/apache/kafka/server/log/internals/CorruptSnapshotException.java

@ -0,0 +1,26 @@ @@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.KafkaException;
public class CorruptSnapshotException extends KafkaException {
public CorruptSnapshotException(String message) {
super(message);
}
}

35
storage/src/main/java/org/apache/kafka/server/log/internals/LogFileUtils.java

@ -0,0 +1,35 @@ @@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
public final class LogFileUtils {
private LogFileUtils() {
}
/**
* Returns the offset for the given file name. The file name is of the form: {number}.{suffix}. This method extracts
* the number from the given file name.
*
* @param fileName name of the file
* @return offset of the given file name
*/
public static long offsetFromFileName(String fileName) {
return Long.parseLong(fileName.substring(0, fileName.indexOf('.')));
}
}

71
storage/src/main/java/org/apache/kafka/server/log/internals/SnapshotFile.java

@ -0,0 +1,71 @@ @@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import static org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName;
public class SnapshotFile {
private static final Logger log = LoggerFactory.getLogger(SnapshotFile.class);
public final long offset;
private volatile File file;
public SnapshotFile(File file) {
this(file, offsetFromFileName(file.getName()));
}
public SnapshotFile(File file, long offset) {
this.file = file;
this.offset = offset;
}
public boolean deleteIfExists() throws IOException {
boolean deleted = Files.deleteIfExists(file.toPath());
if (deleted) {
log.info("Deleted producer state snapshot {}", file.getAbsolutePath());
} else {
log.info("Failed to delete producer state snapshot {} because it does not exist.", file.getAbsolutePath());
}
return deleted;
}
public void updateParentDir(File parentDir) {
String name = file.getName();
file = new File(parentDir, name);
}
public File file() {
return file;
}
public void renameTo(String newSuffix) throws IOException {
File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix));
try {
Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath());
} finally {
file = renamed;
}
}
}
Loading…
Cancel
Save