Browse Source

KAFKA-4313; ISRs may thrash when replication quota is enabled

Author: Jun Rao <junrao@gmail.com>

Reviewers: Ben Stopford <benstopford@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2043 from junrao/kafka-4313
pull/1622/merge
Jun Rao 8 years ago committed by Ismael Juma
parent
commit
24067e4076
  1. 8
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  2. 17
      core/src/main/scala/kafka/server/DelayedFetch.scala
  3. 12
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  4. 30
      core/src/main/scala/kafka/server/ReplicaManager.scala
  5. 74
      core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
  6. 18
      core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

8
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -304,6 +304,14 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { @@ -304,6 +304,14 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
}
def isReplicaInSync(topic: String, partitionId: Int): Boolean = {
val fetcherLagMetrics = stats.get(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
if (fetcherLagMetrics != null)
fetcherLagMetrics.lag <= 0
else
false
}
def unregister(topic: String, partitionId: Int) {
val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
if (lagMetrics != null) lagMetrics.unregister()

17
core/src/main/scala/kafka/server/DelayedFetch.scala

@ -42,6 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int, @@ -42,6 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int,
fetchOnlyLeader: Boolean,
fetchOnlyCommitted: Boolean,
isFromFollower: Boolean,
replicaId: Int,
fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) {
override def toString = "[minBytes: " + fetchMinBytes + ", " +
@ -97,7 +98,8 @@ class DelayedFetch(delayMs: Long, @@ -97,7 +98,8 @@ class DelayedFetch(delayMs: Long,
// Case C, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
if (!(quota.isThrottled(topicAndPartition) && quota.isQuotaExceeded()))
// We will not force complete the fetch request if a replica should be throttled.
if (!replicaManager.shouldLeaderThrottle(quota, topicAndPartition, fetchMetadata.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
@ -139,12 +141,13 @@ class DelayedFetch(delayMs: Long, @@ -139,12 +141,13 @@ class DelayedFetch(delayMs: Long,
*/
override def onComplete() {
val logReadResults = replicaManager.readFromLocalLog(
fetchMetadata.fetchOnlyLeader,
fetchMetadata.fetchOnlyCommitted,
fetchMetadata.fetchMaxBytes,
fetchMetadata.hardMaxBytesLimit,
fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
quota
replicaId = fetchMetadata.replicaId,
fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
quota = quota
)
val fetchPartitionData = logReadResults.map { case (tp, result) =>

12
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -286,10 +286,10 @@ class ReplicaFetcherThread(name: String, @@ -286,10 +286,10 @@ class ReplicaFetcherThread(name: String,
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
val quotaExceeded = quota.isQuotaExceeded
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
if (partitionFetchState.isActive && !(quota.isThrottled(topicAndPartition) && quotaExceeded))
// We will not include a replica in the fetch request if it should be throttled.
if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicAndPartition))
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
}
@ -300,6 +300,14 @@ class ReplicaFetcherThread(name: String, @@ -300,6 +300,14 @@ class ReplicaFetcherThread(name: String,
new FetchRequest(request)
}
/**
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition): Boolean = {
val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
}
}
object ReplicaFetcherThread {

30
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -466,8 +466,14 @@ class ReplicaManager(val config: KafkaConfig, @@ -466,8 +466,14 @@ class ReplicaManager(val config: KafkaConfig,
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
// read from local logs
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchMaxBytes, hardMaxBytesLimit,
fetchInfos, quota)
val logReadResults = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
readOnlyCommitted = fetchOnlyCommitted,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
// if the fetch comes from the follower,
// update its corresponding log end offset
@ -498,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -498,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig,
(topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
@ -514,7 +520,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -514,7 +520,8 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Read from multiple topic partitions at the given offset up to maxSize bytes
*/
def readFromLocalLog(fetchOnlyFromLeader: Boolean,
def readFromLocalLog(replicaId: Int,
fetchOnlyFromLeader: Boolean,
readOnlyCommitted: Boolean,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
@ -559,8 +566,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -559,8 +566,8 @@ class ReplicaManager(val config: KafkaConfig,
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)
// If the partition is marked as throttled, and we are over-quota then exclude it
if (quota.isThrottled(tp) && quota.isQuotaExceeded)
// If the partition is being throttled, simply return an empty set.
if (shouldLeaderThrottle(quota, tp, replicaId))
FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
@ -607,6 +614,17 @@ class ReplicaManager(val config: KafkaConfig, @@ -607,6 +614,17 @@ class ReplicaManager(val config: KafkaConfig,
result
}
/**
* To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition, replicaId: Int): Boolean = {
val isReplicaInSync = getPartition(topicPartition.topic, topicPartition.partition).flatMap { partition =>
partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
}.getOrElse(false)
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
}
def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] =
getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica =>
replica.log.map(_.config.messageFormatVersion.messageFormatVersion)

74
core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala

@ -48,13 +48,21 @@ class ReplicaManagerQuotasTest { @@ -48,13 +48,21 @@ class ReplicaManagerQuotasTest {
@Test
def shouldExcludeSubsequentThrottledPartitions(): Unit = {
setUpMocks(fetchInfo)
val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(false).once()
expect(quota.isQuotaExceeded()).andReturn(true).once()
replay(quota)
val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
val fetch = replicaManager.readFromLocalLog(
replicaId = followerReplicaId,
fetchOnlyFromLeader = true,
readOnlyCommitted = true,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
@ -65,13 +73,21 @@ class ReplicaManagerQuotasTest { @@ -65,13 +73,21 @@ class ReplicaManagerQuotasTest {
@Test
def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = {
setUpMocks(fetchInfo)
val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(true).once()
expect(quota.isQuotaExceeded()).andReturn(true).once()
replay(quota)
val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
val fetch = replicaManager.readFromLocalLog(
replicaId = followerReplicaId,
fetchOnlyFromLeader = true,
readOnlyCommitted = true,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@ -81,20 +97,53 @@ class ReplicaManagerQuotasTest { @@ -81,20 +97,53 @@ class ReplicaManagerQuotasTest {
@Test
def shouldGetBothMessagesIfQuotasAllow(): Unit = {
setUpMocks(fetchInfo)
val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(false).once()
expect(quota.isQuotaExceeded()).andReturn(false).once()
replay(quota)
val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
val fetch = replicaManager.readFromLocalLog(
replicaId = followerReplicaId,
fetchOnlyFromLeader = true,
readOnlyCommitted = true,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
}
def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message) {
@Test
def shouldIncludeInSyncThrottledReplicas(): Unit = {
setUpMocks(fetchInfo, bothReplicasInSync = true)
val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(false).once()
expect(quota.isQuotaExceeded()).andReturn(true).once()
replay(quota)
val fetch = replicaManager.readFromLocalLog(
replicaId = followerReplicaId,
fetchOnlyFromLeader = true,
readOnlyCommitted = true,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
assertEquals("But we should get the second too since it's throttled but in sync", 1,
fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
}
def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message, bothReplicasInSync: Boolean = false) {
val zkUtils = createNiceMock(classOf[ZkUtils])
val scheduler = createNiceMock(classOf[KafkaScheduler])
@ -131,12 +180,19 @@ class ReplicaManagerQuotasTest { @@ -131,12 +180,19 @@ class ReplicaManagerQuotasTest {
//create the two replicas
for ((p, _) <- fetchInfo) {
val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
val replica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
replica.highWatermark = new LogOffsetMetadata(5)
partition.leaderReplicaIdOpt = Some(replica.brokerId)
val allReplicas = List(replica)
val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
leaderReplica.highWatermark = new LogOffsetMetadata(5)
partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
val followerReplica = new Replica(configs.last.brokerId, partition, time, 0, Some(log))
val allReplicas = Set(leaderReplica, followerReplica)
allReplicas.foreach(partition.addReplicaIfNotExists(_))
partition.inSyncReplicas = allReplicas.toSet
if (bothReplicasInSync) {
partition.inSyncReplicas = allReplicas
followerReplica.highWatermark = new LogOffsetMetadata(5)
} else {
partition.inSyncReplicas = Set(leaderReplica)
followerReplica.highWatermark = new LogOffsetMetadata(0)
}
}
}

18
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

@ -149,9 +149,23 @@ class SimpleFetchTest { @@ -149,9 +149,23 @@ class SimpleFetchTest {
val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
replicaManager.readFromLocalLog(
replicaId = Request.OrdinaryConsumerId,
fetchOnlyFromLeader = true,
readOnlyCommitted = true,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
replicaManager.readFromLocalLog(true, false, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
replicaManager.readFromLocalLog(
replicaId = Request.OrdinaryConsumerId,
fetchOnlyFromLeader = true,
readOnlyCommitted = false,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

Loading…
Cancel
Save