diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2f2cb4b64b8..30f51257b34 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -304,6 +304,14 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId)) } + def isReplicaInSync(topic: String, partitionId: Int): Boolean = { + val fetcherLagMetrics = stats.get(new ClientIdTopicPartition(metricId.clientId, topic, partitionId)) + if (fetcherLagMetrics != null) + fetcherLagMetrics.lag <= 0 + else + false + } + def unregister(topic: String, partitionId: Int) { val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId)) if (lagMetrics != null) lagMetrics.unregister() diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 4b17e81e9a4..2feeae8e07a 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -42,6 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int, fetchOnlyLeader: Boolean, fetchOnlyCommitted: Boolean, isFromFollower: Boolean, + replicaId: Int, fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) { override def toString = "[minBytes: " + fetchMinBytes + ", " + @@ -97,7 +98,8 @@ class DelayedFetch(delayMs: Long, // Case C, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata)) - if (!(quota.isThrottled(topicAndPartition) && quota.isQuotaExceeded())) + // We will not force complete the fetch request if a replica should be throttled. + if (!replicaManager.shouldLeaderThrottle(quota, topicAndPartition, fetchMetadata.replicaId)) return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) @@ -139,12 +141,13 @@ class DelayedFetch(delayMs: Long, */ override def onComplete() { val logReadResults = replicaManager.readFromLocalLog( - fetchMetadata.fetchOnlyLeader, - fetchMetadata.fetchOnlyCommitted, - fetchMetadata.fetchMaxBytes, - fetchMetadata.hardMaxBytesLimit, - fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, - quota + replicaId = fetchMetadata.replicaId, + fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader, + readOnlyCommitted = fetchMetadata.fetchOnlyCommitted, + fetchMaxBytes = fetchMetadata.fetchMaxBytes, + hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, + readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, + quota = quota ) val fetchPartitionData = logReadResults.map { case (tp, result) => diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 7930716dd73..6f4c589ca89 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -286,10 +286,10 @@ class ReplicaFetcherThread(name: String, protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = { val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] - val quotaExceeded = quota.isQuotaExceeded partitionMap.foreach { case (topicPartition, partitionFetchState) => val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition) - if (partitionFetchState.isActive && !(quota.isThrottled(topicAndPartition) && quotaExceeded)) + // We will not include a replica in the fetch request if it should be throttled. + if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicAndPartition)) requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)) } @@ -300,6 +300,14 @@ class ReplicaFetcherThread(name: String, new FetchRequest(request) } + /** + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. + */ + private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition): Boolean = { + val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition) + quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync + } } object ReplicaFetcherThread { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2c843e86809..32bc660c7a1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -466,8 +466,14 @@ class ReplicaManager(val config: KafkaConfig, val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) // read from local logs - val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchMaxBytes, hardMaxBytesLimit, - fetchInfos, quota) + val logReadResults = readFromLocalLog( + replicaId = replicaId, + fetchOnlyFromLeader = fetchOnlyFromLeader, + readOnlyCommitted = fetchOnlyCommitted, + fetchMaxBytes = fetchMaxBytes, + hardMaxBytesLimit = hardMaxBytesLimit, + readPartitionInfo = fetchInfos, + quota = quota) // if the fetch comes from the follower, // update its corresponding log end offset @@ -498,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, - fetchOnlyCommitted, isFromFollower, fetchPartitionStatus) + fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation @@ -514,7 +520,8 @@ class ReplicaManager(val config: KafkaConfig, /** * Read from multiple topic partitions at the given offset up to maxSize bytes */ - def readFromLocalLog(fetchOnlyFromLeader: Boolean, + def readFromLocalLog(replicaId: Int, + fetchOnlyFromLeader: Boolean, readOnlyCommitted: Boolean, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, @@ -559,8 +566,8 @@ class ReplicaManager(val config: KafkaConfig, // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage) - // If the partition is marked as throttled, and we are over-quota then exclude it - if (quota.isThrottled(tp) && quota.isQuotaExceeded) + // If the partition is being throttled, simply return an empty set. + if (shouldLeaderThrottle(quota, tp, replicaId)) FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty) // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` @@ -607,6 +614,17 @@ class ReplicaManager(val config: KafkaConfig, result } + /** + * To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. + */ + def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition, replicaId: Int): Boolean = { + val isReplicaInSync = getPartition(topicPartition.topic, topicPartition.partition).flatMap { partition => + partition.getReplica(replicaId).map(partition.inSyncReplicas.contains) + }.getOrElse(false) + quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync + } + def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] = getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica => replica.log.map(_.config.messageFormatVersion.messageFormatVersion) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index f8f8ddad35a..17e0516e16e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -48,13 +48,21 @@ class ReplicaManagerQuotasTest { @Test def shouldExcludeSubsequentThrottledPartitions(): Unit = { setUpMocks(fetchInfo) + val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) expect(quota.isQuotaExceeded()).andReturn(false).once() expect(quota.isQuotaExceeded()).andReturn(true).once() replay(quota) - val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota) + val fetch = replicaManager.readFromLocalLog( + replicaId = followerReplicaId, + fetchOnlyFromLeader = true, + readOnlyCommitted = true, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + readPartitionInfo = fetchInfo, + quota = quota) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) @@ -65,13 +73,21 @@ class ReplicaManagerQuotasTest { @Test def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = { setUpMocks(fetchInfo) + val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) expect(quota.isQuotaExceeded()).andReturn(true).once() expect(quota.isQuotaExceeded()).andReturn(true).once() replay(quota) - val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota) + val fetch = replicaManager.readFromLocalLog( + replicaId = followerReplicaId, + fetchOnlyFromLeader = true, + readOnlyCommitted = true, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + readPartitionInfo = fetchInfo, + quota = quota) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, @@ -81,20 +97,53 @@ class ReplicaManagerQuotasTest { @Test def shouldGetBothMessagesIfQuotasAllow(): Unit = { setUpMocks(fetchInfo) + val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) expect(quota.isQuotaExceeded()).andReturn(false).once() expect(quota.isQuotaExceeded()).andReturn(false).once() replay(quota) - val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota) + val fetch = replicaManager.readFromLocalLog( + replicaId = followerReplicaId, + fetchOnlyFromLeader = true, + readOnlyCommitted = true, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + readPartitionInfo = fetchInfo, + quota = quota) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size) } - def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message) { + @Test + def shouldIncludeInSyncThrottledReplicas(): Unit = { + setUpMocks(fetchInfo, bothReplicasInSync = true) + val followerReplicaId = configs.last.brokerId + + val quota = mockQuota(1000000) + expect(quota.isQuotaExceeded()).andReturn(false).once() + expect(quota.isQuotaExceeded()).andReturn(true).once() + replay(quota) + + val fetch = replicaManager.readFromLocalLog( + replicaId = followerReplicaId, + fetchOnlyFromLeader = true, + readOnlyCommitted = true, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + readPartitionInfo = fetchInfo, + quota = quota) + assertEquals("Given two partitions, with only one throttled, we should get the first", 1, + fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size) + + assertEquals("But we should get the second too since it's throttled but in sync", 1, + fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size) + } + + def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message, bothReplicasInSync: Boolean = false) { val zkUtils = createNiceMock(classOf[ZkUtils]) val scheduler = createNiceMock(classOf[KafkaScheduler]) @@ -131,12 +180,19 @@ class ReplicaManagerQuotasTest { //create the two replicas for ((p, _) <- fetchInfo) { val partition = replicaManager.getOrCreatePartition(p.topic, p.partition) - val replica = new Replica(configs.head.brokerId, partition, time, 0, Some(log)) - replica.highWatermark = new LogOffsetMetadata(5) - partition.leaderReplicaIdOpt = Some(replica.brokerId) - val allReplicas = List(replica) + val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log)) + leaderReplica.highWatermark = new LogOffsetMetadata(5) + partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId) + val followerReplica = new Replica(configs.last.brokerId, partition, time, 0, Some(log)) + val allReplicas = Set(leaderReplica, followerReplica) allReplicas.foreach(partition.addReplicaIfNotExists(_)) - partition.inSyncReplicas = allReplicas.toSet + if (bothReplicasInSync) { + partition.inSyncReplicas = allReplicas + followerReplica.highWatermark = new LogOffsetMetadata(5) + } else { + partition.inSyncReplicas = Set(leaderReplica) + followerReplica.highWatermark = new LogOffsetMetadata(0) + } } } diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 71c2b41bd93..cbd751b36ad 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -149,9 +149,23 @@ class SimpleFetchTest { val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count() assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, - replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message) + replicaManager.readFromLocalLog( + replicaId = Request.OrdinaryConsumerId, + fetchOnlyFromLeader = true, + readOnlyCommitted = true, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + readPartitionInfo = fetchInfo, + quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message) assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, - replicaManager.readFromLocalLog(true, false, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message) + replicaManager.readFromLocalLog( + replicaId = Request.OrdinaryConsumerId, + fetchOnlyFromLeader = true, + readOnlyCommitted = false, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + readPartitionInfo = fetchInfo, + quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message) assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()) assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())