Browse Source

KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#7678)

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/7526/merge
Rajini Sivaram 5 years ago committed by GitHub
parent
commit
6f0008643d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      core/src/main/scala/kafka/server/DelayedFetch.scala
  2. 43
      core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala

42
core/src/main/scala/kafka/server/DelayedFetch.scala

@ -68,12 +68,13 @@ class DelayedFetch(delayMs: Long, @@ -68,12 +68,13 @@ class DelayedFetch(delayMs: Long,
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: This broker does not know of some partitions it tries to fetch
* Case C: The fetch offset locates not on the last segment of the log
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case E: The partition is in an offline log directory on this broker
* Case F: This broker is the leader, but the requested epoch is now fenced
* Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Case B: The replica is no longer available on this broker
* Case C: This broker does not know of some partitions it tries to fetch
* Case D: The partition is in an offline log directory on this broker
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case H: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
@ -94,16 +95,16 @@ class DelayedFetch(delayMs: Long, @@ -94,16 +95,16 @@ class DelayedFetch(delayMs: Long,
case FetchTxnCommitted => offsetSnapshot.lastStableOffset
}
// Go directly to the check for Case D if the message offsets are the same. If the log segment
// Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case C.
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case C, this can happen when the new fetch operation is on a truncated leader
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case C, this can happen when the fetch operation is falling behind the current segment
// Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
@ -118,7 +119,7 @@ class DelayedFetch(delayMs: Long, @@ -118,7 +119,7 @@ class DelayedFetch(delayMs: Long,
}
if (fetchMetadata.isFromFollower) {
// Case G check if the follower has the latest HW from the leader
// Case H check if the follower has the latest HW from the leader
if (partition.getReplica(fetchMetadata.replicaId)
.exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
return forceComplete()
@ -126,23 +127,26 @@ class DelayedFetch(delayMs: Long, @@ -126,23 +127,26 @@ class DelayedFetch(delayMs: Long,
}
}
} catch {
case _: KafkaStorageException => // Case E
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
case _: NotLeaderForPartitionException => // Case A
debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case B
case _: ReplicaNotAvailableException => // Case B
debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case C
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case F
case _: KafkaStorageException => // Case D
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case E
debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
return forceComplete()
case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
}
}
// Case D
// Case G
if (accumulatedSize >= fetchMetadata.fetchMinBytes)
forceComplete()
else

43
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala

@ -19,11 +19,10 @@ package kafka.server @@ -19,11 +19,10 @@ package kafka.server
import java.util.Optional
import scala.collection.Seq
import kafka.cluster.{Partition, Replica}
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException
import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
@ -82,6 +81,46 @@ class DelayedFetchTest extends EasyMockSupport { @@ -82,6 +81,46 @@ class DelayedFetchTest extends EasyMockSupport {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
}
@Test
def testReplicaNotAvailable(): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1
val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}
val delayedFetch = new DelayedFetch(
delayMs = 500,
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback)
val partition: Partition = mock(classOf[Partition])
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
.andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
replayAll()
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)
}
def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L

Loading…
Cancel
Save