Browse Source

KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the current replica with future replica right before the request handler thread executes `futureReplica.log.get.dir.getParent` in the ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition lock when request handler thread attempts to check the destination log directory of the future replica.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #5081 from lindong28/KAFKA-6949
pull/5295/head
Dong Lin 6 years ago
parent
commit
9ea81baf34
  1. 68
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 17
      core/src/main/scala/kafka/server/ReplicaManager.scala
  3. 54
      core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
  4. 2
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

68
core/src/main/scala/kafka/cluster/Partition.scala

@ -149,10 +149,10 @@ class Partition(val topic: String, @@ -149,10 +149,10 @@ class Partition(val topic: String,
* @return true iff the future replica is created
*/
def maybeCreateFutureReplica(logDir: String): Boolean = {
// The readLock is needed to make sure that while the caller checks the log directory of the
// The writeLock is needed to make sure that while the caller checks the log directory of the
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
inReadLock(leaderIsrUpdateLock) {
inWriteLock(leaderIsrUpdateLock) {
val currentReplica = getReplica().get
if (currentReplica.log.get.dir.getParent == logDir)
false
@ -207,29 +207,52 @@ class Partition(val topic: String, @@ -207,29 +207,52 @@ class Partition(val topic: String,
allReplicasMap.remove(replicaId)
}
def removeFutureLocalReplica() {
def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
inReadLock(leaderIsrUpdateLock) {
getReplica(Request.FutureLocalReplicaId) match {
case Some(futureReplica) =>
if (futureReplica.log.get.dir.getParent != newDestinationDir)
true
else
false
case None => false
}
}
}
def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
inWriteLock(leaderIsrUpdateLock) {
allReplicasMap.remove(Request.FutureLocalReplicaId)
if (deleteFromLogDir)
logManager.asyncDelete(topicPartition, isFuture = true)
}
}
// Return true iff the future log has caught up with the current log for this partition
// Return true iff the future replica exists and it has caught up with the current replica for this partition
// Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition
// from its partitionStates if this method returns true
def maybeReplaceCurrentWithFutureReplica(): Boolean = {
val replica = getReplica().get
val futureReplica = getReplica(Request.FutureLocalReplicaId).get
if (replica.logEndOffset == futureReplica.logEndOffset) {
val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
if (futureReplicaLEO.contains(replica.logEndOffset)) {
// The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
// current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
inWriteLock(leaderIsrUpdateLock) {
if (replica.logEndOffset == futureReplica.logEndOffset) {
logManager.replaceCurrentWithFutureLog(topicPartition)
replica.log = futureReplica.log
futureReplica.log = None
allReplicasMap.remove(Request.FutureLocalReplicaId)
true
} else false
getReplica(Request.FutureLocalReplicaId) match {
case Some(futureReplica) =>
if (replica.logEndOffset == futureReplica.logEndOffset) {
logManager.replaceCurrentWithFutureLog(topicPartition)
replica.log = futureReplica.log
futureReplica.log = None
allReplicasMap.remove(Request.FutureLocalReplicaId)
true
} else false
case None =>
// Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
// In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread
// Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition
false
}
}
} else false
}
@ -550,15 +573,22 @@ class Partition(val topic: String, @@ -550,15 +573,22 @@ class Partition(val topic: String,
}
private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = {
if (isFuture)
getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
else {
// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
if (isFuture) {
// The read lock is needed to handle race condition if request handler thread tries to
// remove future replica after receiving AlterReplicaLogDirsRequest.
inReadLock(leaderIsrUpdateLock) {
getReplicaOrException().log.get.appendAsFollower(records)
getReplica(Request.FutureLocalReplicaId) match {
case Some(replica) => replica.log.get.appendAsFollower(records)
case None => // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
}
}
} else {
// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
getReplicaOrException().log.get.appendAsFollower(records)
}
}
}
def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) {

17
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -577,14 +577,17 @@ class ReplicaManager(val config: KafkaConfig, @@ -577,14 +577,17 @@ class ReplicaManager(val config: KafkaConfig,
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
// Stop current replica movement if the destinationDir is different from the existing destination log directory
getReplica(topicPartition, Request.FutureLocalReplicaId) match {
case Some(futureReplica) =>
if (futureReplica.log.get.dir.getParent != destinationDir) {
getPartition(topicPartition) match {
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is offline")
// Stop current replica movement if the destinationDir is different from the existing destination log directory
if (partition.futureReplicaDirChanged(destinationDir)) {
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
getPartition(topicPartition).get.removeFutureLocalReplica()
logManager.asyncDelete(topicPartition, isFuture = true)
partition.removeFutureLocalReplica()
}
case None =>
}
@ -1418,7 +1421,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -1418,7 +1421,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition))
partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica())
partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false))
newOfflinePartitions.foreach { topicPartition =>
val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
partition.removePartitionMetrics()

54
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

@ -19,8 +19,10 @@ package kafka.cluster @@ -19,8 +19,10 @@ package kafka.cluster
import java.io.File
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.Request
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{LogConfig, LogManager, CleanerConfig}
import kafka.server._
@ -44,7 +46,8 @@ class PartitionTest { @@ -44,7 +46,8 @@ class PartitionTest {
val metrics = new Metrics
var tmpDir: File = _
var logDir: File = _
var logDir1: File = _
var logDir2: File = _
var replicaManager: ReplicaManager = _
var logManager: LogManager = _
var logConfig: LogConfig = _
@ -58,13 +61,14 @@ class PartitionTest { @@ -58,13 +61,14 @@ class PartitionTest {
logConfig = LogConfig(logProps)
tmpDir = TestUtils.tempDir()
logDir = TestUtils.randomPartitionLogDir(tmpDir)
logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
logManager = TestUtils.createLogManager(
logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
logManager.startup()
val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
brokerProps.put("log.dir", logDir.getAbsolutePath)
brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
val brokerConfig = KafkaConfig.fromProps(brokerProps)
replicaManager = new ReplicaManager(
config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time),
@ -83,6 +87,48 @@ class PartitionTest { @@ -83,6 +87,48 @@ class PartitionTest {
replicaManager.shutdown(checkpointHW = false)
}
@Test
// Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
val latch = new CountDownLatch(1)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
partition.addReplicaIfNotExists(futureReplica)
partition.addReplicaIfNotExists(currentReplica)
assertEquals(Some(currentReplica), partition.getReplica(brokerId))
assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId))
val thread1 = new Thread {
override def run(): Unit = {
latch.await()
partition.removeFutureLocalReplica()
}
}
val thread2 = new Thread {
override def run(): Unit = {
latch.await()
partition.maybeReplaceCurrentWithFutureReplica()
}
}
thread1.start()
thread2.start()
latch.countDown()
thread1.join()
thread2.join()
assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId))
}
@Test
def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, logConfig)

2
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -929,7 +929,7 @@ object TestUtils extends Logging { @@ -929,7 +929,7 @@ object TestUtils extends Logging {
defaultConfig: LogConfig = LogConfig(),
cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
time: MockTime = new MockTime()): LogManager = {
new LogManager(logDirs = logDirs,
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
initialOfflineDirs = Array.empty[File],
topicConfigs = Map(),
initialDefaultConfig = defaultConfig,

Loading…
Cancel
Save