From 82137ba52e52a7abb652d2774e9b54485ace1612 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 6 Nov 2019 09:22:47 -0800 Subject: [PATCH] MINOR: Fetch only from leader should be respected in purgatory (#7650) In #7361, we inadvertently reverted a change to enforce leader only fetching for old versions of the protocol. This patch fixes the problem and adds a new test case to cover fetches which hit purgatory. Reviewers: Viktor Somogyi , David Arthur --- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../kafka/server/ReplicaManagerTest.scala | 242 +++++++++++++----- 2 files changed, 175 insertions(+), 71 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d428773fcf8..bd99ef3ad8b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -937,8 +937,8 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower, - fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) + val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, + fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, maybeUpdateHwAndSendResponse) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 01d6bf0731b..73b1832c53b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -19,13 +19,12 @@ package kafka.server import java.io.File import java.net.InetAddress -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.{Optional, Properties} import kafka.api.Request import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} -import kafka.utils.TestUtils import kafka.cluster.BrokerEndPoint import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.LazyOffsetCheckpoints @@ -50,7 +49,6 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition} import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{After, Before, Test} -import org.mockito.ArgumentMatchers import scala.collection.JavaConverters._ import scala.collection.{Map, Seq} @@ -762,7 +760,7 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager( topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) @@ -799,7 +797,7 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager( topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) @@ -848,7 +846,7 @@ class ReplicaManagerTest { val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test - val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager( topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) @@ -898,7 +896,7 @@ class ReplicaManagerTest { val props = new Properties() props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class") - val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + prepareReplicaManagerAndLogManager( topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props) } @@ -912,102 +910,208 @@ class ReplicaManagerTest { val leaderEpochIncrement = 2 val countDownLatch = new CountDownLatch(1) - val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + val (replicaManager, _) = prepareReplicaManagerAndLogManager( topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) assertFalse(replicaManager.replicaSelectorOpt.isDefined) } @Test - def testOlderClientFetchFromLeaderOnly(): Unit = { + def testFetchFollowerNotAllowedForOlderClients(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1)) val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq( - new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) - - def doFetch(replicaId: Int, partitionData: FetchRequest.PartitionData, clientMetadataOpt: Option[ClientMetadata]): - Option[FetchPartitionData] = { - var fetchResult: Option[FetchPartitionData] = None - def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = { - fetchResult = response.headOption.filter(_._1 == tp0).map(_._2) - } - replicaManager.fetchMessages( - timeout = 0L, - replicaId = replicaId, - fetchMinBytes = 1, - fetchMaxBytes = 100, - hardMaxBytesLimit = false, - fetchInfos = Seq(tp0 -> partitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = clientMetadataOpt - ) - fetchResult - } + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+) val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") var partitionData = new FetchRequest.PartitionData(0L, 0L, 100, Optional.of(0)) - var fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata)) - assertTrue(fetchResult.isDefined) + var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) + assertNotNull(fetchResult.get) assertEquals(fetchResult.get.error, Errors.NONE) - // Fetch from follower, with empty ClientMetadata - fetchResult = None + // Fetch from follower, with empty ClientMetadata (which implies an older version) partitionData = new FetchRequest.PartitionData(0L, 0L, 100, Optional.of(0)) - fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None) - assertTrue(fetchResult.isDefined) + fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None) + assertNotNull(fetchResult.get) assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION) + } - // Change to a leader, both cases are allowed - val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq( - new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, + @Test + def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = { + val mockTimer = new MockTimer + val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) + + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + val partition0Replicas = Seq[Integer](0, 1).asJava + + val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) - partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + val partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.empty()) + val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10) + assertNull(fetchResult.get) + + // Become a follower and ensure that the delayed fetch returns immediately + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(2) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + + assertNotNull(fetchResult.get) + assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION) + } + + @Test + def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = { + val mockTimer = new MockTimer + val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) + + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + val partition0Replicas = Seq[Integer](0, 1).asJava + + val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + + val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") + val partitionData = new FetchRequest.PartitionData(0L, 0L, 100, Optional.of(1)) - fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata)) - assertTrue(fetchResult.isDefined) + val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata), timeout = 10) + assertNull(fetchResult.get) + + // Become a follower and ensure that the delayed fetch returns immediately + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(2) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + + assertNotNull(fetchResult.get) + assertEquals(fetchResult.get.error, Errors.FENCED_LEADER_EPOCH) + } + + @Test + def testFetchFromLeaderAlwaysAllowed(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1)) + + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + val partition0Replicas = Seq[Integer](0, 1).asJava + + val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + + val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") + var partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(1)) + var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) + assertNotNull(fetchResult.get) assertEquals(fetchResult.get.error, Errors.NONE) - fetchResult = None partitionData = new FetchRequest.PartitionData(0L, 0L, 100, Optional.empty()) - fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None) - assertTrue(fetchResult.isDefined) + fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) + assertNotNull(fetchResult.get) assertEquals(fetchResult.get.error, Errors.NONE) } + private def sendConsumerFetch(replicaManager: ReplicaManager, + topicPartition: TopicPartition, + partitionData: FetchRequest.PartitionData, + clientMetadataOpt: Option[ClientMetadata], + timeout: Long = 0L): AtomicReference[FetchPartitionData] = { + val fetchResult = new AtomicReference[FetchPartitionData]() + def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = { + fetchResult.set(response.toMap.apply(topicPartition)) + } + replicaManager.fetchMessages( + timeout = timeout, + replicaId = Request.OrdinaryConsumerId, + fetchMinBytes = 1, + fetchMaxBytes = 100, + hardMaxBytesLimit = false, + fetchInfos = Seq(topicPartition -> partitionData), + quota = UnboundedQuota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + responseCallback = callback, + clientMetadata = clientMetadataOpt + ) + fetchResult + } + /** * This method assumes that the test using created ReplicaManager calls * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing