Browse Source

KAFKA-9750; Fix race condition with log dir reassign completion (#8412)

There is a race on receiving a LeaderAndIsr request for a replica with an active log dir reassignment. If the reassignment completes just before the LeaderAndIsr handler updates epoch information, it can lead to an illegal state error since no future log dir exists. This patch fixes the problem by ensuring that the future log dir exists when the fetcher is started. Removal cannot happen concurrently because it requires access the same partition state lock.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>

Co-authored-by: Chia-Ping Tsai <chia7712@gmail.com>
pull/8594/head
Jason Gustafson 5 years ago
parent
commit
e4b03f08ae
  1. 14
      core/src/main/scala/kafka/server/AbstractFetcherManager.scala
  2. 7
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  3. 18
      core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
  4. 22
      core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
  5. 4
      core/src/main/scala/kafka/server/ReplicaManager.scala
  6. 1
      core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
  7. 212
      core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
  8. 51
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

14
core/src/main/scala/kafka/server/AbstractFetcherManager.scala

@ -127,7 +127,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri @@ -127,7 +127,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
}
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId): AbstractFetcherThread = {
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
fetcherThread.start()
@ -151,13 +152,18 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri @@ -151,13 +152,18 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
tp -> OffsetAndEpoch(brokerAndInitOffset.initOffset, brokerAndInitOffset.currentLeaderEpoch)
}
fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${brokerAndFetcherId.broker} for partitions $initialOffsetAndEpochs")
addPartitionsToFetcherThread(fetcherThread, initialOffsetAndEpochs)
}
}
}
def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
protected def addPartitionsToFetcherThread(fetcherThread: T,
initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
}
def removeFetcherForPartitions(partitions: Set[TopicPartition]): Unit = {
lock synchronized {
for (fetcher <- fetcherThreadMap.values)
fetcher.removePartitions(partitions)

7
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -60,7 +60,7 @@ abstract class AbstractFetcherThread(name: String, @@ -60,7 +60,7 @@ abstract class AbstractFetcherThread(name: String,
type EpochData = OffsetsForLeaderEpochRequest.PartitionData
private val partitionStates = new PartitionStates[PartitionFetchState]
private val partitionMapLock = new ReentrantLock
protected val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
private val metricId = ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
@ -177,7 +177,7 @@ abstract class AbstractFetcherThread(name: String, @@ -177,7 +177,7 @@ abstract class AbstractFetcherThread(name: String,
* - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
* leader epoch. This is the offset the follower should truncate to ensure
* accurate log replication.
* - Finally truncate the logs for partitions in the truncating phase and mark them
* - Finally truncate the logs for partitions in the truncating phase and mark the
* truncation complete. Do this within a lock to ensure no leadership changes can
* occur during truncation.
*/
@ -405,7 +405,7 @@ abstract class AbstractFetcherThread(name: String, @@ -405,7 +405,7 @@ abstract class AbstractFetcherThread(name: String,
} finally partitionMapLock.unlock()
}
def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]) {
def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
initialFetchStates.foreach { case (tp, initialFetchState) =>
@ -424,6 +424,7 @@ abstract class AbstractFetcherThread(name: String, @@ -424,6 +424,7 @@ abstract class AbstractFetcherThread(name: String,
}
partitionMapCond.signalAll()
initialFetchStates.keySet
} finally partitionMapLock.unlock()
}

18
core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package kafka.server
import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.TopicPartition
class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
replicaManager: ReplicaManager,
@ -34,7 +35,22 @@ class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig, @@ -34,7 +35,22 @@ class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
quotaManager, brokerTopicStats)
}
def shutdown() {
override protected def addPartitionsToFetcherThread(fetcherThread: ReplicaAlterLogDirsThread,
initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
val addedPartitions = fetcherThread.addPartitions(initialOffsetAndEpochs)
val (addedInitialOffsets, notAddedInitialOffsets) = initialOffsetAndEpochs.partition { case (tp, _) =>
addedPartitions.contains(tp)
}
if (addedInitialOffsets.nonEmpty)
info(s"Added log dir fetcher for partitions with initial offsets $addedInitialOffsets")
if (notAddedInitialOffsets.nonEmpty)
info(s"Failed to add log dir fetch for partitions ${notAddedInitialOffsets.keySet} " +
s"since the log dir reassignment has already completed")
}
def shutdown(): Unit = {
info("shutting down")
closeAllFetchers()
info("shutdown completed")

22
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

@ -82,7 +82,7 @@ class ReplicaAlterLogDirsThread(name: String, @@ -82,7 +82,7 @@ class ReplicaAlterLogDirsThread(name: String,
Request.FutureLocalReplicaId,
request.minBytes,
request.maxBytes,
request.version <= 2,
false,
request.fetchData.asScala.toSeq,
UnboundedQuota,
processResponseCallback,
@ -106,7 +106,11 @@ class ReplicaAlterLogDirsThread(name: String, @@ -106,7 +106,11 @@ class ReplicaAlterLogDirsThread(name: String,
throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
val logAppendInfo = if (records.sizeInBytes() > 0)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
else
None
val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
@ -118,6 +122,20 @@ class ReplicaAlterLogDirsThread(name: String, @@ -118,6 +122,20 @@ class ReplicaAlterLogDirsThread(name: String,
logAppendInfo
}
override def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
// It is possible that the log dir fetcher completed just before this call, so we
// filter only the partitions which still have a future log dir.
val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
replicaMgr.futureLogExists(tp)
}
super.addPartitions(filteredFetchStates)
} finally {
partitionMapLock.unlock()
}
}
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
offsetSnapshot.logStartOffset

4
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -452,6 +452,10 @@ class ReplicaManager(val config: KafkaConfig, @@ -452,6 +452,10 @@ class ReplicaManager(val config: KafkaConfig,
nonOfflinePartition(topicPartition).flatMap(_.localReplica)
}
def futureLogExists(topicPartition: TopicPartition): Boolean = {
getPartitionOrException(topicPartition, expectLeader = false).futureLocalReplica.isDefined
}
def getLogDir(topicPartition: TopicPartition): Option[String] = {
localReplica(topicPartition).flatMap(_.log).map(_.dir.getParent)
}

1
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala

@ -43,6 +43,7 @@ class AbstractFetcherManagerTest { @@ -43,6 +43,7 @@ class AbstractFetcherManagerTest {
EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
.andReturn(Set(tp))
EasyMock.expect(fetcher.fetchState(tp))
.andReturn(Some(PartitionFetchState(fetchOffset, leaderEpoch, Truncating)))
EasyMock.expect(fetcher.removePartitions(Set(tp)))

212
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala

@ -18,22 +18,29 @@ package kafka.server @@ -18,22 +18,29 @@ package kafka.server
import java.util.Optional
import kafka.api.Request
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.log.LogManager
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, IsolationLevel, OffsetsForLeaderEpochRequest}
import org.easymock.EasyMock._
import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.Test
import org.mockito.Mockito.{doNothing, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq}
import scala.collection.JavaConverters._
class ReplicaAlterLogDirsThreadTest {
@ -44,6 +51,199 @@ class ReplicaAlterLogDirsThreadTest { @@ -44,6 +51,199 @@ class ReplicaAlterLogDirsThreadTest {
OffsetAndEpoch(offset = fetchOffset, leaderEpoch = leaderEpoch)
}
@Test
def shouldNotAddPartitionIfFutureLogIsNotDefined(): Unit = {
val brokerId = 1
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234"))
val replicaManager = Mockito.mock(classOf[ReplicaManager])
val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
when(replicaManager.futureLogExists(t1p0)).thenReturn(false)
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
val thread = new ReplicaAlterLogDirsThread(
"alter-logs-dirs-thread",
sourceBroker = endPoint,
brokerConfig = config,
replicaMgr = replicaManager,
quota = quotaManager,
brokerTopicStats = new BrokerTopicStats)
val addedPartitions = thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L)))
assertEquals(Set.empty, addedPartitions)
assertEquals(0, thread.partitionCount())
assertEquals(None, thread.fetchState(t1p0))
}
@Test
def shouldUpdateLeaderEpochAfterFencedEpochError(): Unit = {
val brokerId = 1
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234"))
val partition = Mockito.mock(classOf[Partition])
val replicaManager = Mockito.mock(classOf[ReplicaManager])
val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
val futureReplica = Mockito.mock(classOf[Replica])
val leaderEpoch = 5
val logEndOffset = 0
when(replicaManager.futureLocalReplicaOrException(t1p0)).thenReturn(futureReplica)
when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
when(replicaManager.getPartition(t1p0)).thenReturn(Some(partition))
when(quotaManager.isQuotaExceeded).thenReturn(false)
when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset(leaderEpoch, logEndOffset))
when(partition.futureLocalReplicaOrException).thenReturn(futureReplica)
doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
when(futureReplica.logStartOffset).thenReturn(0L)
when(futureReplica.logEndOffset).thenReturn(LogOffsetMetadata(0L))
when(futureReplica.latestEpoch).thenReturn(None)
val fencedRequestData = new FetchRequest.PartitionData(0L, 0L,
config.replicaFetchMaxBytes, Optional.of(leaderEpoch - 1))
val fencedResponseData = FetchPartitionData(
error = Errors.FENCED_LEADER_EPOCH,
highWatermark = -1,
logStartOffset = -1,
records = MemoryRecords.EMPTY,
lastStableOffset = None,
abortedTransactions = None)
mockFetchFromCurrentLog(t1p0, fencedRequestData, config, replicaManager, fencedResponseData)
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
val thread = new ReplicaAlterLogDirsThread(
"alter-logs-dirs-thread",
sourceBroker = endPoint,
brokerConfig = config,
replicaMgr = replicaManager,
quota = quotaManager,
brokerTopicStats = new BrokerTopicStats)
// Initially we add the partition with an older epoch which results in an error
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch - 1)))
assertTrue(thread.fetchState(t1p0).isDefined)
assertEquals(1, thread.partitionCount())
thread.doWork()
assertEquals(None, thread.fetchState(t1p0))
assertEquals(0, thread.partitionCount())
// Next we update the epoch and assert that we can continue
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch)))
assertEquals(Some(leaderEpoch), thread.fetchState(t1p0).map(_.currentLeaderEpoch))
assertEquals(1, thread.partitionCount())
val requestData = new FetchRequest.PartitionData(0L, 0L,
config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
val responseData = FetchPartitionData(
error = Errors.NONE,
highWatermark = 0L,
logStartOffset = 0L,
records = MemoryRecords.EMPTY,
lastStableOffset = None,
abortedTransactions = None)
mockFetchFromCurrentLog(t1p0, requestData, config, replicaManager, responseData)
thread.doWork()
assertEquals(None, thread.fetchState(t1p0))
assertEquals(0, thread.partitionCount())
}
@Test
def shouldReplaceCurrentLogDirWhenCaughtUp(): Unit = {
val brokerId = 1
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234"))
val partition = Mockito.mock(classOf[Partition])
val replicaManager = Mockito.mock(classOf[ReplicaManager])
val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
val futureReplica = Mockito.mock(classOf[Replica])
val leaderEpoch = 5
val logEndOffset = 0
when(replicaManager.futureLocalReplicaOrException(t1p0)).thenReturn(futureReplica)
when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
when(replicaManager.getPartition(t1p0)).thenReturn(Some(partition))
when(quotaManager.isQuotaExceeded).thenReturn(false)
when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset(leaderEpoch, logEndOffset))
when(partition.futureLocalReplicaOrException).thenReturn(futureReplica)
doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
when(futureReplica.logStartOffset).thenReturn(0L)
when(futureReplica.logEndOffset).thenReturn(LogOffsetMetadata(0L))
when(futureReplica.latestEpoch).thenReturn(None)
val requestData = new FetchRequest.PartitionData(0L, 0L,
config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
val responseData = FetchPartitionData(
error = Errors.NONE,
highWatermark = 0L,
logStartOffset = 0L,
records = MemoryRecords.EMPTY,
lastStableOffset = None,
abortedTransactions = None)
mockFetchFromCurrentLog(t1p0, requestData, config, replicaManager, responseData)
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
val thread = new ReplicaAlterLogDirsThread(
"alter-logs-dirs-thread",
sourceBroker = endPoint,
brokerConfig = config,
replicaMgr = replicaManager,
quota = quotaManager,
brokerTopicStats = new BrokerTopicStats)
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch)))
assertTrue(thread.fetchState(t1p0).isDefined)
assertEquals(1, thread.partitionCount())
thread.doWork()
assertEquals(None, thread.fetchState(t1p0))
assertEquals(0, thread.partitionCount())
}
private def mockFetchFromCurrentLog(topicPartition: TopicPartition,
requestData: FetchRequest.PartitionData,
config: KafkaConfig,
replicaManager: ReplicaManager,
responseData: FetchPartitionData): Unit = {
val callbackCaptor: ArgumentCaptor[Seq[(TopicPartition, FetchPartitionData)] => Unit] =
ArgumentCaptor.forClass(classOf[Seq[(TopicPartition, FetchPartitionData)] => Unit])
when(replicaManager.fetchMessages(
timeout = ArgumentMatchers.eq(0L),
replicaId = ArgumentMatchers.eq(Request.FutureLocalReplicaId),
fetchMinBytes = ArgumentMatchers.eq(0),
fetchMaxBytes = ArgumentMatchers.eq(config.replicaFetchResponseMaxBytes),
hardMaxBytesLimit = ArgumentMatchers.eq(false),
fetchInfos = ArgumentMatchers.eq(Seq(topicPartition -> requestData)),
quota = ArgumentMatchers.eq(UnboundedQuota),
responseCallback = callbackCaptor.capture(),
isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED)
)).thenAnswer(new Answer[Any] {
override def answer(invocation: InvocationOnMock): Unit = {
callbackCaptor.getValue.apply(Seq((topicPartition, responseData)))
}
})
}
@Test
def issuesEpochRequestFromLocalReplica(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
@ -171,7 +371,9 @@ class ReplicaAlterLogDirsThreadTest { @@ -171,7 +371,9 @@ class ReplicaAlterLogDirsThreadTest {
expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
.andStubReturn(partitionT1p1)
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplicaT1p0)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
expect(replicaManager.futureLocalReplicaOrException(t1p1)).andStubReturn(futureReplicaT1p1)
expect(replicaManager.futureLogExists(t1p1)).andStubReturn(true)
expect(partitionT1p0.truncateTo(capture(truncateCaptureT1p0), anyBoolean())).anyTimes()
expect(partitionT1p1.truncateTo(capture(truncateCaptureT1p1), anyBoolean())).anyTimes()
@ -244,6 +446,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -244,6 +446,7 @@ class ReplicaAlterLogDirsThreadTest {
expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
.andStubReturn(partition)
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
expect(partition.truncateTo(capture(truncateToCapture), EasyMock.eq(true))).anyTimes()
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
@ -314,6 +517,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -314,6 +517,7 @@ class ReplicaAlterLogDirsThreadTest {
.andStubReturn(partition)
expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).anyTimes()
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes()
@ -369,6 +573,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -369,6 +573,7 @@ class ReplicaAlterLogDirsThreadTest {
expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).once()
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
expect(futureReplica.latestEpoch).andStubReturn(Some(futureReplicaLeaderEpoch))
expect(futureReplica.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn(
Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
@ -452,6 +657,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -452,6 +657,7 @@ class ReplicaAlterLogDirsThreadTest {
expect(partition.truncateTo(futureReplicaLEO, isFuture = true)).once()
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
expect(futureReplica.latestEpoch).andStubReturn(Some(leaderEpoch))
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
expect(futureReplica.endOffsetForEpoch(leaderEpoch)).andReturn(
@ -604,11 +810,13 @@ class ReplicaAlterLogDirsThreadTest { @@ -604,11 +810,13 @@ class ReplicaAlterLogDirsThreadTest {
expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes()
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
expect(replicaManager.localReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
expect(replicaManager.futureLocalReplica(t1p1)).andReturn(Some(futureReplica)).anyTimes()
expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
expect(replicaManager.futureLocalReplicaOrException(t1p1)).andReturn(futureReplica).anyTimes()
expect(replicaManager.futureLogExists(t1p1)).andStubReturn(true)
expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
}

51
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -195,6 +195,12 @@ class ReplicaManagerTest { @@ -195,6 +195,12 @@ class ReplicaManagerTest {
@Test
def testFencedErrorCausedByBecomeLeader(): Unit = {
testFencedErrorCausedByBecomeLeader(0)
testFencedErrorCausedByBecomeLeader(1)
testFencedErrorCausedByBecomeLeader(10)
}
private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
try {
val brokerList = Seq[Integer](0, 1).asJava
@ -202,43 +208,46 @@ class ReplicaManagerTest { @@ -202,43 +208,46 @@ class ReplicaManagerTest {
replicaManager.getOrCreatePartition(topicPartition)
.getOrCreateReplica(0, isNew = false)
val replica = replicaManager.localReplicaOrException(topicPartition)
assertFalse(replicaManager.futureLogExists(topicPartition))
assertEquals(None, replicaManager.futureLocalReplica(topicPartition))
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion,
0,
0,
brokerEpoch,
Map(topicPartition -> new LeaderAndIsrRequest.PartitionState(0, 0,
epoch, brokerList, 0, brokerList, true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
.localReplica.flatMap(_.log).get
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)
val previousReplicaFolder = replica.log.get.dir.getParentFile
// find the live and different folder
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == previousReplicaFolder).size)
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == previousReplicaFolder).head
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
assertTrue(replicaManager.futureLocalReplica(topicPartition).flatMap(_.log).isDefined)
// make sure the future log is created
val futureReplica = replicaManager.futureLocalReplicaOrException(topicPartition)
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// change the epoch from 0 to 1 in order to make fenced error
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
s"the partition=$topicPartition should be removed from pending state")
// the partition is added to failedPartitions if fenced error happens
// if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does
// not happen and failedPartitions is empty.
if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
(1 to loopEpochChange).foreach(epoch => replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => ()))
// wait for the ReplicaAlterLogDirsThread to complete
TestUtils.waitUntilTrue(() => {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
// send request again
replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
// the future folder exists so it fails to invoke thread
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
}
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty
}, s"ReplicaAlterLogDirsThread should be gone")
// the replica change is completed after retrying
assertTrue(futureReplica.log.isEmpty)
assertEquals(newReplicaFolder.getAbsolutePath, replica.log.get.dir.getParent)
// change the replica folder again
val response = replicaManager.alterReplicaLogDirs(Map(topicPartition -> previousReplicaFolder.getAbsolutePath))
assertNotEquals(0, response.size)
response.values.foreach(assertEquals(Errors.NONE, _))
// should succeed to invoke ReplicaAlterLogDirsThread again
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
} finally replicaManager.shutdown(checkpointHW = false)
}

Loading…
Cancel
Save