Browse Source

KAFKA-8362: fix the old checkpoint won't be removed after alter log dir (#9178)

In KIP-113, we support replicas movement between log directories. But while the directory change, we forgot to remove the topicPartition offset data in old directory, which will cause there are more than 1 checkpoint copy stayed in the logs for the altered topicPartition. And it'll let the LogCleaner get stuck due to it's possible to always get the old topicPartition offset data from the old checkpoint file.

I added one more parameter topicPartitionToBeRemoved in updateCheckpoints() method. So, if the update parameter is None (as before), we'll do the remove action to remove the topicPartitionToBeRemoved data in dir, otherwise, update the data as before.

Reviewers: Jun Rao <junrao@gmail.com>
pull/9288/head
Luke Chen 4 years ago committed by GitHub
parent
commit
77a0bba140
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      core/src/main/scala/kafka/log/LogCleaner.scala
  2. 51
      core/src/main/scala/kafka/log/LogCleanerManager.scala
  3. 11
      core/src/main/scala/kafka/log/LogManager.scala
  4. 8
      core/src/main/scala/kafka/server/ReplicaManager.scala
  5. 12
      core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
  6. 12
      core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
  7. 100
      core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
  8. 2
      core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
  9. 2
      core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala

14
core/src/main/scala/kafka/log/LogCleaner.scala

@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig, @@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
* Update checkpoint file, removing topics and partitions that no longer exist
* Update checkpoint file to remove partitions if necessary.
*/
def updateCheckpoints(dataDir: File): Unit = {
cleanerManager.updateCheckpoints(dataDir, update=None)
def updateCheckpoints(dataDir: File, partitionToRemove: Option[TopicPartition] = None): Unit = {
cleanerManager.updateCheckpoints(dataDir, partitionToRemove = partitionToRemove)
}
/**
* alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, destLogDir)
}
/**
* Stop cleaning logs in the provided directory
*
* @param dir the absolute path of the log dir
*/
def handleLogDirFailure(dir: String): Unit = {
cleanerManager.handleLogDirFailure(dir)
}

51
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint)
updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset))
updateCheckpoints(log.parentDirFile, partitionToUpdateOrAdd = Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
@ -354,13 +354,32 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -354,13 +354,32 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}
def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
/**
* Update checkpoint file, adding or removing partitions if necessary.
*
* @param dataDir The File object to be updated
* @param partitionToUpdateOrAdd The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
* @param topicPartitionToBeRemoved The TopicPartition to be removed
*/
def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Option[(TopicPartition, Long)] = None,
partitionToRemove: Option[TopicPartition] = None): Unit = {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
if (checkpoint != null) {
try {
val existing = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) } ++ update
checkpoint.write(existing)
val currentCheckpoint = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap
// remove the partition offset if any
var updatedCheckpoint = partitionToRemove match {
case Some(topicPartion) => currentCheckpoint - topicPartion
case None => currentCheckpoint
}
// update or add the partition offset if any
updatedCheckpoint = partitionToUpdateOrAdd match {
case Some(updatedOffset) => updatedCheckpoint + updatedOffset
case None => updatedCheckpoint
}
checkpoint.write(updatedCheckpoint)
} catch {
case e: KafkaStorageException =>
error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
@ -369,15 +388,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -369,15 +388,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}
/**
* alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
inLock(lock) {
try {
checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
case Some(offset) =>
// Remove this partition from the checkpoint file in the source log directory
updateCheckpoints(sourceLogDir, None)
// Add offset for this partition to the checkpoint file in the destination log directory
updateCheckpoints(destLogDir, Option(topicPartition, offset))
debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
s"from ${sourceLogDir.getAbsoluteFile} directory.")
updateCheckpoints(sourceLogDir, partitionToRemove = Option(topicPartition))
debug(s"Adding the partition offset data in checkpoint file for '${topicPartition}' " +
s"to ${destLogDir.getAbsoluteFile} directory.")
updateCheckpoints(destLogDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
case None =>
}
} catch {
@ -393,6 +418,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -393,6 +418,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}
/**
* Stop cleaning logs in the provided directory
*
* @param dir the absolute path of the log dir
*/
def handleLogDirFailure(dir: String): Unit = {
warn(s"Stopping cleaning logs in dir $dir")
inLock(lock) {
@ -400,6 +430,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -400,6 +430,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}
/**
* Truncate the checkpointed offset for the given partition if its checkpointed offset is larger than the given offset
*/
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long): Unit = {
inLock(lock) {
if (logs.get(topicPartition).config.compact) {
@ -420,7 +453,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -420,7 +453,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
inLock(lock) {
inProgress.get(topicPartition) match {
case Some(LogCleaningInProgress) =>
updateCheckpoints(dataDir, Option(topicPartition, endOffset))
updateCheckpoints(dataDir, partitionToUpdateOrAdd = Option(topicPartition, endOffset))
inProgress.remove(topicPartition)
case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused(1))

11
core/src/main/scala/kafka/log/LogManager.scala

@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File], @@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File],
numRecoveryThreadsPerDataDir = newSize
}
// dir should be an absolute path
/**
* The log directory failure handler. It will stop log cleaning in that directory.
*
* @param dir the absolute path of the log directory
*/
def handleLogDirFailure(dir: String): Unit = {
warn(s"Stopping serving logs in dir $dir")
logCreationOrDeletionLock synchronized {
@ -962,8 +966,9 @@ class LogManager(logDirs: Seq[File], @@ -962,8 +966,9 @@ class LogManager(logDirs: Seq[File],
// We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition)
if (checkpoint)
cleaner.updateCheckpoints(removedLog.parentDirFile)
if (checkpoint) {
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
}
}
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
if (checkpoint) {

8
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -1729,8 +1729,12 @@ class ReplicaManager(val config: KafkaConfig, @@ -1729,8 +1729,12 @@ class ReplicaManager(val config: KafkaConfig,
Partition.removeMetrics(tp)
}
// logDir should be an absolute path
// sendZkNotification is needed for unit test
/**
* The log directory failure handler for the replica
*
* @param dir the absolute path of the log directory
* @param sendZkNotification check if we need to send notification to zookeeper node (needed for unit test)
*/
def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {
if (!logManager.isLogDirOnline(dir))
return

12
core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala

@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile { @@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
}
/**
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*/
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*
* The format in the LeaderEpoch checkpoint file is like this:
* -----checkpoint file begin------
* 0 <- LeaderEpochCheckpointFile.currentVersion
* 2 <- following entries size
* 0 1 <- the format is: leader_epoch(int32) start_offset(int64)
* 1 2
* -----checkpoint file end----------
*/
class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
import LeaderEpochCheckpointFile._

12
core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala

@ -50,8 +50,16 @@ trait OffsetCheckpoint { @@ -50,8 +50,16 @@ trait OffsetCheckpoint {
}
/**
* This class persists a map of (Partition => Offsets) to a file (for a certain replica)
*/
* This class persists a map of (Partition => Offsets) to a file (for a certain replica)
*
* The format in the offset checkpoint file is like this:
* -----checkpoint file begin------
* 0 <- OffsetCheckpointFile.currentVersion
* 2 <- following entries size
* tp1 par1 1 <- the format is: TOPIC PARTITION OFFSET
* tp1 par2 2
* -----checkpoint file end----------
*/
class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) {
val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)

100
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

@ -37,14 +37,18 @@ import scala.collection.mutable @@ -37,14 +37,18 @@ import scala.collection.mutable
class LogCleanerManagerTest extends Logging {
val tmpDir = TestUtils.tempDir()
val tmpDir2 = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
val topicPartition = new TopicPartition("log", 0)
val topicPartition2 = new TopicPartition("log2", 0)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val logConfig = LogConfig(logProps)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
val offset = 999
val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
@ -55,8 +59,11 @@ class LogCleanerManagerTest extends Logging { @@ -55,8 +59,11 @@ class LogCleanerManagerTest extends Logging {
cleanerCheckpoints.toMap
}
override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
val (tp, offset) = update.getOrElse(throw new IllegalArgumentException("update=None argument not yet handled"))
override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Option[(TopicPartition,Long)] = None,
partitionToRemove: Option[TopicPartition] = None): Unit = {
assert(partitionToRemove.isEmpty, "partitionToRemove argument with value not yet handled")
val (tp, offset) = partitionToUpdateOrAdd.getOrElse(
throw new IllegalArgumentException("partitionToUpdateOrAdd==None argument not yet handled"))
cleanerCheckpoints.put(tp, offset)
}
}
@ -361,6 +368,93 @@ class LogCleanerManagerTest extends Logging { @@ -361,6 +368,93 @@ class LogCleanerManagerTest extends Logging {
assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
}
@Test
def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
// expect the checkpoint offset is now updated to the expected offset after doing updateCheckpoints
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
}
@Test
def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// write some data into the cleaner-offset-checkpoint file
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
// updateCheckpoints should remove the topicPartition data in the logDir
cleanerManager.updateCheckpoints(logDir, partitionToRemove = Option(topicPartition))
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
}
@Test
def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// write some data into the cleaner-offset-checkpoint file in logDir and logDir2
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
cleanerManager.updateCheckpoints(logDir2, partitionToUpdateOrAdd = Option(topicPartition2, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
// verify the partition data in logDir is gone, and data in logDir2 is still there
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
}
@Test
def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val lowerOffset = 1L
val higherOffset = 1000L
// write some data into the cleaner-offset-checkpoint file in logDir
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
// we should not truncate the checkpoint data for checkpointed offset <= the given offset (higherOffset)
cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, higherOffset)
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
// we should truncate the checkpoint data for checkpointed offset > the given offset (lowerOffset)
cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, lowerOffset)
assertEquals(lowerOffset, cleanerManager.allCleanerCheckpoints(topicPartition))
}
@Test
def testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// write some data into the cleaner-offset-checkpoint file in logDir
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
cleanerManager.alterCheckpointDir(topicPartition, logDir, logDir2)
// verify we still can get the partition offset after alterCheckpointDir
// This data should locate in logDir2, not logDir
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
// force delete the logDir2 from checkpoints, so that the partition data should also be deleted
cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
}
/**
* log under cleanup should still be eligible for log truncation
*/
@ -642,7 +736,7 @@ class LogCleanerManagerTest extends Logging { @@ -642,7 +736,7 @@ class LogCleanerManagerTest extends Logging {
private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicPartition, Log]()
logs.put(topicPartition, log)
new LogCleanerManager(Seq(logDir), logs, null)
new LogCleanerManager(Seq(logDir, logDir2), logs, null)
}
private def createCleanerManagerMock(pool: Pool[TopicPartition, Log]): LogCleanerManagerMock = {

2
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala

@ -82,7 +82,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A @@ -82,7 +82,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
// force a checkpoint
// and make sure its gone from checkpoint file
cleaner.logs.remove(topicPartitions(0))
cleaner.updateCheckpoints(logDir)
cleaner.updateCheckpoints(logDir, partitionToRemove = Option(topicPartitions(0)))
val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
// we expect partition 0 to be gone
assertFalse(checkpoints.contains(topicPartitions(0)))

2
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala

@ -37,7 +37,7 @@ import scala.annotation.nowarn @@ -37,7 +37,7 @@ import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
* Test whether clients can producer and consume when there is log directory failure
* Test whether clients can produce and consume when there is log directory failure
*/
class LogDirFailureTest extends IntegrationTestHarness {

Loading…
Cancel
Save