Browse Source

KAFKA-14240; Validate kraft snapshot state on startup (#12653)

We should prevent the metadata log from initializing in a known bad state. If the log start offset of the first segment is greater than 0, then must be a snapshot an offset greater than or equal to it order to ensure that the initialized state is complete.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
pull/12636/head
Jason Gustafson 2 years ago committed by GitHub
parent
commit
8c8b5366a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      core/src/main/scala/kafka/log/LogLoader.scala
  2. 43
      core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
  3. 85
      core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
  4. 15
      raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
  5. 6
      raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java

7
core/src/main/scala/kafka/log/LogLoader.scala

@ -19,7 +19,6 @@ package kafka.log @@ -19,7 +19,6 @@ package kafka.log
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import kafka.common.LogSegmentOffsetOverflowException
import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
@ -28,6 +27,7 @@ import kafka.utils.{CoreUtils, Logging, Scheduler} @@ -28,6 +27,7 @@ import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.snapshot.Snapshots
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.{Set, mutable}
@ -229,7 +229,10 @@ class LogLoader( @@ -229,7 +229,10 @@ class LogLoader(
if (!file.canRead)
throw new IOException(s"Could not read file $file")
val filename = file.getName
if (filename.endsWith(DeletedFileSuffix)) {
// Delete stray files marked for deletion, but skip KRaft snapshots.
// These are handled in the recovery logic in `KafkaMetadataLog`.
if (filename.endsWith(DeletedFileSuffix) && !filename.endsWith(Snapshots.DELETE_SUFFIX)) {
debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
} else if (filename.endsWith(CleanedFileSuffix)) {

43
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package kafka.raft
import kafka.log.{AppendOrigin, Defaults, UnifiedLog, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
import kafka.log.{AppendOrigin, Defaults, LogConfig, LogOffsetSnapshot, SnapshotGenerated, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
import kafka.utils.{CoreUtils, Logging, Scheduler}
@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record @@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import java.io.File
import java.nio.file.{Files, NoSuchFileException, Path}
@ -546,7 +546,7 @@ case class MetadataLogConfig(logSegmentBytes: Int, @@ -546,7 +546,7 @@ case class MetadataLogConfig(logSegmentBytes: Int,
fileDeleteDelayMs: Int,
nodeId: Int)
object KafkaMetadataLog {
object KafkaMetadataLog extends Logging {
def apply(
topicPartition: TopicPartition,
topicId: Uuid,
@ -623,7 +623,9 @@ object KafkaMetadataLog { @@ -623,7 +623,9 @@ object KafkaMetadataLog {
private def recoverSnapshots(
log: UnifiedLog
): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
val snapshotsToRetain = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
val snapshotsToDelete = mutable.Buffer.empty[SnapshotPath]
// Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
// from logStartOffset
val filesInDir = Files.newDirectoryStream(log.dir.toPath)
@ -631,21 +633,40 @@ object KafkaMetadataLog { @@ -631,21 +633,40 @@ object KafkaMetadataLog {
try {
filesInDir.forEach { path =>
Snapshots.parse(path).ifPresent { snapshotPath =>
if (snapshotPath.partial ||
snapshotPath.deleted ||
snapshotPath.snapshotId.offset < log.logStartOffset) {
// Delete partial snapshot, deleted snapshot and older snapshot
Files.deleteIfExists(snapshotPath.path)
// Collect partial snapshot, deleted snapshot and older snapshot for deletion
if (snapshotPath.partial
|| snapshotPath.deleted
|| snapshotPath.snapshotId.offset < log.logStartOffset) {
snapshotsToDelete.append(snapshotPath)
} else {
snapshots.put(snapshotPath.snapshotId, None)
snapshotsToRetain.put(snapshotPath.snapshotId, None)
}
}
}
// Before deleting any snapshots, we should ensure that the retained snapshots are
// consistent with the current state of the log. If the log start offset is not 0,
// then we must have a snapshot which covers the initial state up to the current
// log start offset.
if (log.logStartOffset > 0) {
val latestSnapshotId = snapshotsToRetain.lastOption.map(_._1)
if (!latestSnapshotId.exists(snapshotId => snapshotId.offset >= log.logStartOffset)) {
throw new IllegalStateException("Inconsistent snapshot state: there must be a snapshot " +
s"at an offset larger then the current log start offset ${log.logStartOffset}, but the " +
s"latest snapshot is $latestSnapshotId")
}
}
snapshotsToDelete.foreach { snapshotPath =>
Files.deleteIfExists(snapshotPath.path)
info(s"Deleted unneeded snapshot file with path $snapshotPath")
}
} finally {
filesInDir.close()
}
snapshots
info(s"Initialized snapshots with IDs ${snapshotsToRetain.keys} from ${log.dir}")
snapshotsToRetain
}
private def deleteSnapshotFiles(

85
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package kafka.raft
import kafka.log.{Defaults, UnifiedLog, SegmentDeletion}
import kafka.log.{Defaults, SegmentDeletion, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp}
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.{MockTime, TestUtils}
@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Utils @@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.raft._
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.snapshot.{RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.test.TestUtils.assertOptional
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -391,6 +391,87 @@ final class KafkaMetadataLogTest { @@ -391,6 +391,87 @@ final class KafkaMetadataLogTest {
}
}
@Test
def testStartupWithInvalidSnapshotState(): Unit = {
// Initialize an empty log at offset 100.
var log = buildMetadataLog(tempDir, mockTime)
log.log.truncateFullyAndStartAt(newOffset = 100)
log.close()
val metadataDir = metadataLogDir(tempDir)
assertTrue(metadataDir.exists())
// Initialization should fail unless we have a snapshot at an offset
// greater than or equal to 100.
assertThrows(classOf[IllegalStateException], () => {
buildMetadataLog(tempDir, mockTime)
})
// Snapshots at offsets less than 100 are not sufficient.
writeEmptySnapshot(metadataDir, new OffsetAndEpoch(50, 1))
assertThrows(classOf[IllegalStateException], () => {
buildMetadataLog(tempDir, mockTime)
})
// Snapshot at offset 100 should be fine.
writeEmptySnapshot(metadataDir, new OffsetAndEpoch(100, 1))
log = buildMetadataLog(tempDir, mockTime)
log.log.truncateFullyAndStartAt(newOffset = 200)
log.close()
// Snapshots at higher offsets are also fine. In this case, the
// log start offset should advance to the first snapshot offset.
writeEmptySnapshot(metadataDir, new OffsetAndEpoch(500, 1))
log = buildMetadataLog(tempDir, mockTime)
assertEquals(500, log.log.logStartOffset)
}
@Test
def testSnapshotDeletionWithInvalidSnapshotState(): Unit = {
// Initialize an empty log at offset 100.
val log = buildMetadataLog(tempDir, mockTime)
log.log.truncateFullyAndStartAt(newOffset = 100)
log.close()
val metadataDir = metadataLogDir(tempDir)
assertTrue(metadataDir.exists())
// We have one deleted snapshot at an offset matching the start offset.
val snapshotId = new OffsetAndEpoch(100, 1)
writeEmptySnapshot(metadataDir, snapshotId)
val deletedPath = Snapshots.markForDelete(metadataDir.toPath, snapshotId)
assertTrue(deletedPath.toFile.exists())
// Initialization should still fail.
assertThrows(classOf[IllegalStateException], () => {
buildMetadataLog(tempDir, mockTime)
})
// The snapshot marked for deletion should still exist.
assertTrue(deletedPath.toFile.exists())
}
private def metadataLogDir(
logDir: File
): File = {
new File(
logDir.getAbsolutePath,
UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition)
)
}
private def writeEmptySnapshot(
metadataDir: File,
snapshotId: OffsetAndEpoch
): Unit = {
val writer = FileRawSnapshotWriter.create(
metadataDir.toPath,
snapshotId,
Optional.empty()
)
TestUtils.resource(writer)(_.freeze())
}
@Test
def testDoesntTruncateFully(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)

15
raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java

@ -32,7 +32,7 @@ public final class Snapshots { @@ -32,7 +32,7 @@ public final class Snapshots {
private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
public static final String SUFFIX = ".checkpoint";
private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX);
private static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX);
public static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX);
private static final NumberFormat OFFSET_FORMATTER = NumberFormat.getInstance();
private static final NumberFormat EPOCH_FORMATTER = NumberFormat.getInstance();
@ -60,7 +60,7 @@ public final class Snapshots { @@ -60,7 +60,7 @@ public final class Snapshots {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) + SUFFIX);
}
static Path deleteRename(Path source, OffsetAndEpoch snapshotId) {
static Path deleteRenamePath(Path source, OffsetAndEpoch snapshotId) {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) + DELETE_SUFFIX);
}
@ -114,7 +114,7 @@ public final class Snapshots { @@ -114,7 +114,7 @@ public final class Snapshots {
*/
public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) {
Path immutablePath = snapshotPath(logDir, snapshotId);
Path deletedPath = deleteRename(immutablePath, snapshotId);
Path deletedPath = deleteRenamePath(immutablePath, snapshotId);
try {
boolean deleted = Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath);
if (deleted) {
@ -130,13 +130,16 @@ public final class Snapshots { @@ -130,13 +130,16 @@ public final class Snapshots {
}
/**
* Mark a snapshot for deletion by renaming with the deleted suffix
* Mark a snapshot for deletion by renaming with the deleted suffix.
*
* @return the path of the snapshot marked for deletion (i.e. with .delete suffix)
*/
public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
public static Path markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
Path immutablePath = snapshotPath(logDir, snapshotId);
Path deletedPath = deleteRename(immutablePath, snapshotId);
Path deletedPath = deleteRenamePath(immutablePath, snapshotId);
try {
Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
return deletedPath;
} catch (IOException e) {
throw new UncheckedIOException(
String.format(

6
raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java

@ -75,7 +75,7 @@ final public class SnapshotsTest { @@ -75,7 +75,7 @@ final public class SnapshotsTest {
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
);
Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(), snapshotId);
Path deletedPath = Snapshots.deleteRename(path, snapshotId);
Path deletedPath = Snapshots.deleteRenamePath(path, snapshotId);
SnapshotPath snapshotPath = Snapshots.parse(deletedPath).get();
assertEquals(snapshotId, snapshotPath.snapshotId);
@ -116,11 +116,11 @@ final public class SnapshotsTest { @@ -116,11 +116,11 @@ final public class SnapshotsTest {
if (renameBeforeDeleting)
// rename snapshot before deleting
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId), false);
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRenamePath(snapshotPath, snapshotId), false);
assertTrue(Snapshots.deleteIfExists(logDirPath, snapshot.snapshotId()));
assertFalse(Files.exists(snapshotPath));
assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId)));
assertFalse(Files.exists(Snapshots.deleteRenamePath(snapshotPath, snapshotId)));
}
}
}

Loading…
Cancel
Save