From d9f0be45231fd19036095ccc544b3df304811724 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 17 Oct 2019 12:44:14 -0400 Subject: [PATCH] KAFKA-9004; Prevent older clients from fetching from a follower (#7531) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With KIP-392, we allow consumers to fetch from followers. This capability is enabled when a replica selector has been provided in the configuration. When not in use, the intent is to preserve current behavior of fetching only from leader. The leader epoch is the mechanism that keeps us honest. When there is a leader change, the epoch gets bumped, consumer fetches fail due to the fenced epoch, and we find the new leader. However, for old consumers, there is no similar protection. The leader epoch was not available to clients until recently. If there is a preferred leader election (for example), the old consumer will happily continue fetching from the demoted leader until a periodic metadata fetch causes us to discover the new leader. This does not create any problems from a correctness perspective–fetches are still bound by the high watermark–but it is unexpected and may cause unexpected performance characteristics. This patch fixes this problem by enforcing leader-only fetching for older versions of the fetch request. Reviewers: Jason Gustafson --- .../scala/kafka/server/ReplicaManager.scala | 12 ++- .../kafka/server/ReplicaManagerTest.scala | 94 ++++++++++++++++++- 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d10c93645c3..ff393390e27 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -848,18 +848,20 @@ class ReplicaManager(val config: KafkaConfig, isolationLevel: IsolationLevel, clientMetadata: Option[ClientMetadata]): Unit = { val isFromFollower = Request.isValidBrokerId(replicaId) - - val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId) + val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId) + val fetchIsolation = if (!isFromConsumer) FetchLogEnd else if (isolationLevel == IsolationLevel.READ_COMMITTED) FetchTxnCommitted else FetchHighWatermark + // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata) + val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty) def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( replicaId = replicaId, - fetchOnlyFromLeader = isFromFollower, + fetchOnlyFromLeader = fetchOnlyFromLeader, fetchIsolation = fetchIsolation, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, @@ -917,7 +919,7 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower, + val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, maybeUpdateHwAndSendResponse) @@ -971,7 +973,7 @@ class ReplicaManager(val config: KafkaConfig, s"${preferredReadReplica.get} for $clientMetadata") } // If a preferred read-replica is set, skip the read - val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false) + val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false) LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), highWatermark = offsetSnapshot.highWatermark.messageOffset, leaderLogStartOffset = offsetSnapshot.logStartOffset, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 484ee2d962a..ef05fa1499c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,10 +24,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.{Optional, Properties} import kafka.api.Request -import kafka.cluster.BrokerEndPoint import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} -import kafka.utils.{MockScheduler, MockTime, TestUtils} -import TestUtils.createBroker +import kafka.utils.TestUtils import kafka.cluster.BrokerEndPoint import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.LazyOffsetCheckpoints @@ -921,6 +919,96 @@ class ReplicaManagerTest { assertFalse(replicaManager.replicaSelectorOpt.isDefined) } + @Test + def testOlderClientFetchFromLeaderOnly(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1)) + + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + val partition0Replicas = Seq[Integer](0, 1).asJava + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + def doFetch(replicaId: Int, partitionData: FetchRequest.PartitionData, clientMetadataOpt: Option[ClientMetadata]): + Option[FetchPartitionData] = { + var fetchResult: Option[FetchPartitionData] = None + def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = { + fetchResult = response.headOption.filter(_._1 == tp0).map(_._2) + } + replicaManager.fetchMessages( + timeout = 0L, + replicaId = replicaId, + fetchMinBytes = 1, + fetchMaxBytes = 100, + hardMaxBytesLimit = false, + fetchInfos = Seq(tp0 -> partitionData), + quota = UnboundedQuota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + responseCallback = callback, + clientMetadata = clientMetadataOpt + ) + fetchResult + } + + // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+) + val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") + var partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(0)) + var fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata)) + assertTrue(fetchResult.isDefined) + assertEquals(fetchResult.get.error, Errors.NONE) + + // Fetch from follower, with empty ClientMetadata + fetchResult = None + partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(0)) + fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None) + assertTrue(fetchResult.isDefined) + assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION) + + // Change to a leader, both cases are allowed + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + + partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(1)) + fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata)) + assertTrue(fetchResult.isDefined) + assertEquals(fetchResult.get.error, Errors.NONE) + + fetchResult = None + partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.empty()) + fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None) + assertTrue(fetchResult.isDefined) + assertEquals(fetchResult.get.error, Errors.NONE) + } + /** * This method assumes that the test using created ReplicaManager calls * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing