Browse Source

MINOR: Fetch only from leader should be respected in purgatory (#7650)

In #7361, we inadvertently reverted a change to enforce leader only fetching for old versions of the protocol. This patch fixes the problem and adds a new test case to cover fetches which hit purgatory.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, David Arthur <mumrah@gmail.com>
pull/7663/head
Jason Gustafson 5 years ago committed by GitHub
parent
commit
82137ba52e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/src/main/scala/kafka/server/ReplicaManager.scala
  2. 242
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

4
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -937,8 +937,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -937,8 +937,8 @@ class ReplicaManager(val config: KafkaConfig,
fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
})
}
val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower,
fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
maybeUpdateHwAndSendResponse)

242
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -19,13 +19,12 @@ package kafka.server @@ -19,13 +19,12 @@ package kafka.server
import java.io.File
import java.net.InetAddress
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Optional, Properties}
import kafka.api.Request
import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.TestUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.LazyOffsetCheckpoints
@ -50,7 +49,6 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition} @@ -50,7 +49,6 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.ArgumentMatchers
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq}
@ -762,7 +760,7 @@ class ReplicaManagerTest { @@ -762,7 +760,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@ -799,7 +797,7 @@ class ReplicaManagerTest { @@ -799,7 +797,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@ -848,7 +846,7 @@ class ReplicaManagerTest { @@ -848,7 +846,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@ -898,7 +896,7 @@ class ReplicaManagerTest { @@ -898,7 +896,7 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
}
@ -912,102 +910,208 @@ class ReplicaManagerTest { @@ -912,102 +910,208 @@ class ReplicaManagerTest {
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
val (replicaManager, _) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
}
@Test
def testOlderClientFetchFromLeaderOnly(): Unit = {
def testFetchFollowerNotAllowedForOlderClients(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
def doFetch(replicaId: Int, partitionData: FetchRequest.PartitionData, clientMetadataOpt: Option[ClientMetadata]):
Option[FetchPartitionData] = {
var fetchResult: Option[FetchPartitionData] = None
def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResult = response.headOption.filter(_._1 == tp0).map(_._2)
}
replicaManager.fetchMessages(
timeout = 0L,
replicaId = replicaId,
fetchMinBytes = 1,
fetchMaxBytes = 100,
hardMaxBytesLimit = false,
fetchInfos = Seq(tp0 -> partitionData),
quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback,
clientMetadata = clientMetadataOpt
)
fetchResult
}
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
// Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
var partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(0))
var fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata))
assertTrue(fetchResult.isDefined)
var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NONE)
// Fetch from follower, with empty ClientMetadata
fetchResult = None
// Fetch from follower, with empty ClientMetadata (which implies an older version)
partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(0))
fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None)
assertTrue(fetchResult.isDefined)
fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION)
}
// Change to a leader, both cases are allowed
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
@Test
def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
val mockTimer = new MockTimer
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.empty())
val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
assertNull(fetchResult.get)
// Become a follower and ensure that the delayed fetch returns immediately
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(2)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION)
}
@Test
def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = {
val mockTimer = new MockTimer
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(1))
fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata))
assertTrue(fetchResult.isDefined)
val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata), timeout = 10)
assertNull(fetchResult.get)
// Become a follower and ensure that the delayed fetch returns immediately
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(2)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.FENCED_LEADER_EPOCH)
}
@Test
def testFetchFromLeaderAlwaysAllowed(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
var partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(1))
var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NONE)
fetchResult = None
partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.empty())
fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None)
assertTrue(fetchResult.isDefined)
fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NONE)
}
private def sendConsumerFetch(replicaManager: ReplicaManager,
topicPartition: TopicPartition,
partitionData: FetchRequest.PartitionData,
clientMetadataOpt: Option[ClientMetadata],
timeout: Long = 0L): AtomicReference[FetchPartitionData] = {
val fetchResult = new AtomicReference[FetchPartitionData]()
def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResult.set(response.toMap.apply(topicPartition))
}
replicaManager.fetchMessages(
timeout = timeout,
replicaId = Request.OrdinaryConsumerId,
fetchMinBytes = 1,
fetchMaxBytes = 100,
hardMaxBytesLimit = false,
fetchInfos = Seq(topicPartition -> partitionData),
quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback,
clientMetadata = clientMetadataOpt
)
fetchResult
}
/**
* This method assumes that the test using created ReplicaManager calls
* ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing

Loading…
Cancel
Save