From 9ea81baf34113ccb05244ea9fb63199e7f071bb0 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Mon, 25 Jun 2018 23:39:02 -0700 Subject: [PATCH] KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the current replica with future replica right before the request handler thread executes `futureReplica.log.get.dir.getParent` in the ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition lock when request handler thread attempts to check the destination log directory of the future replica. Author: Dong Lin Reviewers: Jun Rao Closes #5081 from lindong28/KAFKA-6949 --- .../main/scala/kafka/cluster/Partition.scala | 68 +++++++++++++------ .../scala/kafka/server/ReplicaManager.scala | 17 +++-- .../unit/kafka/cluster/PartitionTest.scala | 54 +++++++++++++-- .../scala/unit/kafka/utils/TestUtils.scala | 2 +- 4 files changed, 110 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9ab1ec47af8..b80c34475d3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -149,10 +149,10 @@ class Partition(val topic: String, * @return true iff the future replica is created */ def maybeCreateFutureReplica(logDir: String): Boolean = { - // The readLock is needed to make sure that while the caller checks the log directory of the + // The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. - inReadLock(leaderIsrUpdateLock) { + inWriteLock(leaderIsrUpdateLock) { val currentReplica = getReplica().get if (currentReplica.log.get.dir.getParent == logDir) false @@ -207,29 +207,52 @@ class Partition(val topic: String, allReplicasMap.remove(replicaId) } - def removeFutureLocalReplica() { + def futureReplicaDirChanged(newDestinationDir: String): Boolean = { + inReadLock(leaderIsrUpdateLock) { + getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => + if (futureReplica.log.get.dir.getParent != newDestinationDir) + true + else + false + case None => false + } + } + } + + def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) { inWriteLock(leaderIsrUpdateLock) { allReplicasMap.remove(Request.FutureLocalReplicaId) + if (deleteFromLogDir) + logManager.asyncDelete(topicPartition, isFuture = true) } } - // Return true iff the future log has caught up with the current log for this partition + // Return true iff the future replica exists and it has caught up with the current replica for this partition // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition // from its partitionStates if this method returns true def maybeReplaceCurrentWithFutureReplica(): Boolean = { val replica = getReplica().get - val futureReplica = getReplica(Request.FutureLocalReplicaId).get - if (replica.logEndOffset == futureReplica.logEndOffset) { + val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset) + if (futureReplicaLEO.contains(replica.logEndOffset)) { // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the // current replica, no other thread can update LEO of the current replica via log truncation or log append operation. inWriteLock(leaderIsrUpdateLock) { - if (replica.logEndOffset == futureReplica.logEndOffset) { - logManager.replaceCurrentWithFutureLog(topicPartition) - replica.log = futureReplica.log - futureReplica.log = None - allReplicasMap.remove(Request.FutureLocalReplicaId) - true - } else false + getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => + if (replica.logEndOffset == futureReplica.logEndOffset) { + logManager.replaceCurrentWithFutureLog(topicPartition) + replica.log = futureReplica.log + futureReplica.log = None + allReplicasMap.remove(Request.FutureLocalReplicaId) + true + } else false + case None => + // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called + // In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread + // Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition + false + } } } else false } @@ -550,15 +573,22 @@ class Partition(val topic: String, } private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { - if (isFuture) - getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records) - else { - // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread - // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. + inReadLock(leaderIsrUpdateLock) { + if (isFuture) { + // The read lock is needed to handle race condition if request handler thread tries to + // remove future replica after receiving AlterReplicaLogDirsRequest. inReadLock(leaderIsrUpdateLock) { - getReplicaOrException().log.get.appendAsFollower(records) + getReplica(Request.FutureLocalReplicaId) match { + case Some(replica) => replica.log.get.appendAsFollower(records) + case None => // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called + } } + } else { + // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread + // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. + getReplicaOrException().log.get.appendAsFollower(records) } + } } def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 965595b2c2e..ed9559f856a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -577,14 +577,17 @@ class ReplicaManager(val config: KafkaConfig, if (!logManager.isLogDirOnline(destinationDir)) throw new KafkaStorageException(s"Log directory $destinationDir is offline") - // Stop current replica movement if the destinationDir is different from the existing destination log directory - getReplica(topicPartition, Request.FutureLocalReplicaId) match { - case Some(futureReplica) => - if (futureReplica.log.get.dir.getParent != destinationDir) { + getPartition(topicPartition) match { + case Some(partition) => + if (partition eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException(s"Partition $topicPartition is offline") + + // Stop current replica movement if the destinationDir is different from the existing destination log directory + if (partition.futureReplicaDirChanged(destinationDir)) { replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) - getPartition(topicPartition).get.removeFutureLocalReplica() - logManager.asyncDelete(topicPartition, isFuture = true) + partition.removeFutureLocalReplica() } + case None => } @@ -1418,7 +1421,7 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition)) - partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica()) + partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false)) newOfflinePartitions.foreach { topicPartition => val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) partition.removePartitionMetrics() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index fe9038ab255..41bdefddb76 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -19,8 +19,10 @@ package kafka.cluster import java.io.File import java.nio.ByteBuffer import java.util.Properties +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean +import kafka.api.Request import kafka.common.UnexpectedAppendOffsetException import kafka.log.{LogConfig, LogManager, CleanerConfig} import kafka.server._ @@ -44,7 +46,8 @@ class PartitionTest { val metrics = new Metrics var tmpDir: File = _ - var logDir: File = _ + var logDir1: File = _ + var logDir2: File = _ var replicaManager: ReplicaManager = _ var logManager: LogManager = _ var logConfig: LogConfig = _ @@ -58,13 +61,14 @@ class PartitionTest { logConfig = LogConfig(logProps) tmpDir = TestUtils.tempDir() - logDir = TestUtils.randomPartitionLogDir(tmpDir) + logDir1 = TestUtils.randomPartitionLogDir(tmpDir) + logDir2 = TestUtils.randomPartitionLogDir(tmpDir) logManager = TestUtils.createLogManager( - logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) + logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) logManager.startup() val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) - brokerProps.put("log.dir", logDir.getAbsolutePath) + brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(",")) val brokerConfig = KafkaConfig.fromProps(brokerProps) replicaManager = new ReplicaManager( config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time), @@ -83,6 +87,48 @@ class PartitionTest { replicaManager.shutdown(checkpointHW = false) } + @Test + // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently + def testMaybeReplaceCurrentWithFutureReplica(): Unit = { + val latch = new CountDownLatch(1) + + logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) + val log1 = logManager.getOrCreateLog(topicPartition, logConfig) + logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) + val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true) + val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1)) + val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2)) + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + + partition.addReplicaIfNotExists(futureReplica) + partition.addReplicaIfNotExists(currentReplica) + assertEquals(Some(currentReplica), partition.getReplica(brokerId)) + assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId)) + + val thread1 = new Thread { + override def run(): Unit = { + latch.await() + partition.removeFutureLocalReplica() + } + } + + val thread2 = new Thread { + override def run(): Unit = { + latch.await() + partition.maybeReplaceCurrentWithFutureReplica() + } + } + + thread1.start() + thread2.start() + + latch.countDown() + thread1.join() + thread2.join() + assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId)) + } + + @Test def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { val log = logManager.getOrCreateLog(topicPartition, logConfig) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index da87c309dbd..d2aae2c54d7 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -929,7 +929,7 @@ object TestUtils extends Logging { defaultConfig: LogConfig = LogConfig(), cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), time: MockTime = new MockTime()): LogManager = { - new LogManager(logDirs = logDirs, + new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(), initialDefaultConfig = defaultConfig,