Browse Source

KAFKA-9004; Prevent older clients from fetching from a follower (#7531)

With KIP-392, we allow consumers to fetch from followers. This capability is enabled when a replica selector has been provided in the configuration. When not in use, the intent is to preserve current behavior of fetching only from leader. The leader epoch is the mechanism that keeps us honest. When there is a leader change, the epoch gets bumped, consumer fetches fail due to the fenced epoch, and we find the new leader.

However, for old consumers, there is no similar protection. The leader epoch was not available to clients until recently. If there is a preferred leader election (for example), the old consumer will happily continue fetching from the demoted leader until a periodic metadata fetch causes us to discover the new leader. This does not create any problems from a correctness perspective–fetches are still bound by the high watermark–but it is unexpected and may cause unexpected performance characteristics.

This patch fixes this problem by enforcing leader-only fetching for older versions of the fetch request.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/7546/head
David Arthur 5 years ago committed by Jason Gustafson
parent
commit
d9f0be4523
  1. 12
      core/src/main/scala/kafka/server/ReplicaManager.scala
  2. 94
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

12
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -848,18 +848,20 @@ class ReplicaManager(val config: KafkaConfig, @@ -848,18 +848,20 @@ class ReplicaManager(val config: KafkaConfig,
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): Unit = {
val isFromFollower = Request.isValidBrokerId(replicaId)
val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
val fetchIsolation = if (!isFromConsumer)
FetchLogEnd
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
FetchTxnCommitted
else
FetchHighWatermark
// Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
val result = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = isFromFollower,
fetchOnlyFromLeader = fetchOnlyFromLeader,
fetchIsolation = fetchIsolation,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
@ -917,7 +919,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -917,7 +919,7 @@ class ReplicaManager(val config: KafkaConfig,
fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
})
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower,
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
maybeUpdateHwAndSendResponse)
@ -971,7 +973,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -971,7 +973,7 @@ class ReplicaManager(val config: KafkaConfig,
s"${preferredReadReplica.get} for $clientMetadata")
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false)
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,

94
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -24,10 +24,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -24,10 +24,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Optional, Properties}
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
import kafka.utils.TestUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.LazyOffsetCheckpoints
@ -921,6 +919,96 @@ class ReplicaManagerTest { @@ -921,6 +919,96 @@ class ReplicaManagerTest {
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
}
@Test
def testOlderClientFetchFromLeaderOnly(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
def doFetch(replicaId: Int, partitionData: FetchRequest.PartitionData, clientMetadataOpt: Option[ClientMetadata]):
Option[FetchPartitionData] = {
var fetchResult: Option[FetchPartitionData] = None
def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResult = response.headOption.filter(_._1 == tp0).map(_._2)
}
replicaManager.fetchMessages(
timeout = 0L,
replicaId = replicaId,
fetchMinBytes = 1,
fetchMaxBytes = 100,
hardMaxBytesLimit = false,
fetchInfos = Seq(tp0 -> partitionData),
quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback,
clientMetadata = clientMetadataOpt
)
fetchResult
}
// Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
var partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(0))
var fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata))
assertTrue(fetchResult.isDefined)
assertEquals(fetchResult.get.error, Errors.NONE)
// Fetch from follower, with empty ClientMetadata
fetchResult = None
partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(0))
fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None)
assertTrue(fetchResult.isDefined)
assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION)
// Change to a leader, both cases are allowed
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(1))
fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata))
assertTrue(fetchResult.isDefined)
assertEquals(fetchResult.get.error, Errors.NONE)
fetchResult = None
partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.empty())
fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None)
assertTrue(fetchResult.isDefined)
assertEquals(fetchResult.get.error, Errors.NONE)
}
/**
* This method assumes that the test using created ReplicaManager calls
* ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing

Loading…
Cancel
Save