Browse Source

MINOR: Skip quota check when replica is in sync (#6344)

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6349/head
lambdaliu 6 years ago committed by Jason Gustafson
parent
commit
33e005994d
  1. 2
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  2. 2
      core/src/main/scala/kafka/server/ReplicaManager.scala

2
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -328,7 +328,7 @@ class ReplicaFetcherThread(name: String,
*/ */
private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = { private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition) val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
} }
} }

2
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -1007,7 +1007,7 @@ class ReplicaManager(val config: KafkaConfig,
val isReplicaInSync = nonOfflinePartition(topicPartition).exists { partition => val isReplicaInSync = nonOfflinePartition(topicPartition).exists { partition =>
partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains) partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains)
} }
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
} }
def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localReplica(topicPartition).flatMap(_.log.map(_.config)) def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localReplica(topicPartition).flatMap(_.log.map(_.config))

Loading…
Cancel
Save