diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9ab1ec47af8..b80c34475d3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -149,10 +149,10 @@ class Partition(val topic: String, * @return true iff the future replica is created */ def maybeCreateFutureReplica(logDir: String): Boolean = { - // The readLock is needed to make sure that while the caller checks the log directory of the + // The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. - inReadLock(leaderIsrUpdateLock) { + inWriteLock(leaderIsrUpdateLock) { val currentReplica = getReplica().get if (currentReplica.log.get.dir.getParent == logDir) false @@ -207,29 +207,52 @@ class Partition(val topic: String, allReplicasMap.remove(replicaId) } - def removeFutureLocalReplica() { + def futureReplicaDirChanged(newDestinationDir: String): Boolean = { + inReadLock(leaderIsrUpdateLock) { + getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => + if (futureReplica.log.get.dir.getParent != newDestinationDir) + true + else + false + case None => false + } + } + } + + def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) { inWriteLock(leaderIsrUpdateLock) { allReplicasMap.remove(Request.FutureLocalReplicaId) + if (deleteFromLogDir) + logManager.asyncDelete(topicPartition, isFuture = true) } } - // Return true iff the future log has caught up with the current log for this partition + // Return true iff the future replica exists and it has caught up with the current replica for this partition // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition // from its partitionStates if this method returns true def maybeReplaceCurrentWithFutureReplica(): Boolean = { val replica = getReplica().get - val futureReplica = getReplica(Request.FutureLocalReplicaId).get - if (replica.logEndOffset == futureReplica.logEndOffset) { + val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset) + if (futureReplicaLEO.contains(replica.logEndOffset)) { // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the // current replica, no other thread can update LEO of the current replica via log truncation or log append operation. inWriteLock(leaderIsrUpdateLock) { - if (replica.logEndOffset == futureReplica.logEndOffset) { - logManager.replaceCurrentWithFutureLog(topicPartition) - replica.log = futureReplica.log - futureReplica.log = None - allReplicasMap.remove(Request.FutureLocalReplicaId) - true - } else false + getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => + if (replica.logEndOffset == futureReplica.logEndOffset) { + logManager.replaceCurrentWithFutureLog(topicPartition) + replica.log = futureReplica.log + futureReplica.log = None + allReplicasMap.remove(Request.FutureLocalReplicaId) + true + } else false + case None => + // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called + // In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread + // Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition + false + } } } else false } @@ -550,15 +573,22 @@ class Partition(val topic: String, } private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { - if (isFuture) - getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records) - else { - // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread - // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. + inReadLock(leaderIsrUpdateLock) { + if (isFuture) { + // The read lock is needed to handle race condition if request handler thread tries to + // remove future replica after receiving AlterReplicaLogDirsRequest. inReadLock(leaderIsrUpdateLock) { - getReplicaOrException().log.get.appendAsFollower(records) + getReplica(Request.FutureLocalReplicaId) match { + case Some(replica) => replica.log.get.appendAsFollower(records) + case None => // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called + } } + } else { + // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread + // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. + getReplicaOrException().log.get.appendAsFollower(records) } + } } def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 965595b2c2e..ed9559f856a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -577,14 +577,17 @@ class ReplicaManager(val config: KafkaConfig, if (!logManager.isLogDirOnline(destinationDir)) throw new KafkaStorageException(s"Log directory $destinationDir is offline") - // Stop current replica movement if the destinationDir is different from the existing destination log directory - getReplica(topicPartition, Request.FutureLocalReplicaId) match { - case Some(futureReplica) => - if (futureReplica.log.get.dir.getParent != destinationDir) { + getPartition(topicPartition) match { + case Some(partition) => + if (partition eq ReplicaManager.OfflinePartition) + throw new KafkaStorageException(s"Partition $topicPartition is offline") + + // Stop current replica movement if the destinationDir is different from the existing destination log directory + if (partition.futureReplicaDirChanged(destinationDir)) { replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) - getPartition(topicPartition).get.removeFutureLocalReplica() - logManager.asyncDelete(topicPartition, isFuture = true) + partition.removeFutureLocalReplica() } + case None => } @@ -1418,7 +1421,7 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition)) - partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica()) + partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false)) newOfflinePartitions.foreach { topicPartition => val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) partition.removePartitionMetrics() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index fe9038ab255..41bdefddb76 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -19,8 +19,10 @@ package kafka.cluster import java.io.File import java.nio.ByteBuffer import java.util.Properties +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean +import kafka.api.Request import kafka.common.UnexpectedAppendOffsetException import kafka.log.{LogConfig, LogManager, CleanerConfig} import kafka.server._ @@ -44,7 +46,8 @@ class PartitionTest { val metrics = new Metrics var tmpDir: File = _ - var logDir: File = _ + var logDir1: File = _ + var logDir2: File = _ var replicaManager: ReplicaManager = _ var logManager: LogManager = _ var logConfig: LogConfig = _ @@ -58,13 +61,14 @@ class PartitionTest { logConfig = LogConfig(logProps) tmpDir = TestUtils.tempDir() - logDir = TestUtils.randomPartitionLogDir(tmpDir) + logDir1 = TestUtils.randomPartitionLogDir(tmpDir) + logDir2 = TestUtils.randomPartitionLogDir(tmpDir) logManager = TestUtils.createLogManager( - logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) + logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) logManager.startup() val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) - brokerProps.put("log.dir", logDir.getAbsolutePath) + brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(",")) val brokerConfig = KafkaConfig.fromProps(brokerProps) replicaManager = new ReplicaManager( config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time), @@ -83,6 +87,48 @@ class PartitionTest { replicaManager.shutdown(checkpointHW = false) } + @Test + // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently + def testMaybeReplaceCurrentWithFutureReplica(): Unit = { + val latch = new CountDownLatch(1) + + logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) + val log1 = logManager.getOrCreateLog(topicPartition, logConfig) + logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) + val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true) + val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1)) + val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2)) + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + + partition.addReplicaIfNotExists(futureReplica) + partition.addReplicaIfNotExists(currentReplica) + assertEquals(Some(currentReplica), partition.getReplica(brokerId)) + assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId)) + + val thread1 = new Thread { + override def run(): Unit = { + latch.await() + partition.removeFutureLocalReplica() + } + } + + val thread2 = new Thread { + override def run(): Unit = { + latch.await() + partition.maybeReplaceCurrentWithFutureReplica() + } + } + + thread1.start() + thread2.start() + + latch.countDown() + thread1.join() + thread2.join() + assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId)) + } + + @Test def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { val log = logManager.getOrCreateLog(topicPartition, logConfig) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index da87c309dbd..d2aae2c54d7 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -929,7 +929,7 @@ object TestUtils extends Logging { defaultConfig: LogConfig = LogConfig(), cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), time: MockTime = new MockTime()): LogManager = { - new LogManager(logDirs = logDirs, + new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(), initialDefaultConfig = defaultConfig,