Browse Source

MINOR: Simplify SensorAccess usage

I was investigating an exception in this code and found a few
opportunities for making it clearer.

I also added the `out` folder to `.gitignore` as IntelliJ sometimes
uses that as the build folder.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3552 from ijuma/minor-quota-improvements
pull/3552/merge
Ismael Juma 7 years ago
parent
commit
84d2b6a01c
  1. 1
      .gitignore
  2. 30
      core/src/main/scala/kafka/server/ClientQuotaManager.scala
  3. 13
      core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
  4. 21
      core/src/main/scala/kafka/server/SensorAccess.scala

1
.gitignore vendored

@ -3,6 +3,7 @@ dist @@ -3,6 +3,7 @@ dist
target/
build/
build_eclipse/
out/
.gradle/
lib_managed/
src_managed/

30
core/src/main/scala/kafka/server/ClientQuotaManager.scala

@ -138,10 +138,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -138,10 +138,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val time: Time) extends Logging {
private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
@volatile private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
@volatile private var quotaTypesEnabled =
if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas
else QuotaTypes.ClientIdQuotaEnabled
private val lock = new ReentrantReadWriteLock()
private val delayQueue = new DelayQueue[ThrottledResponse]()
private val sensorAccessor = new SensorAccess
private val sensorAccessor = new SensorAccess(lock, metrics)
val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
@ -392,24 +394,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -392,24 +394,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
sensorAccessor.getOrCreate(
getQuotaSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock, metrics,
() => clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
() => getQuotaMetricConfig(clientQuotaEntity.quota),
() => measurableStat
clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
new Rate
),
sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock,
metrics,
() => throttleMetricName(clientQuotaEntity),
() => null,
() => new Avg()
throttleMetricName(clientQuotaEntity),
None,
new Avg
)
)
}
private def measurableStat: MeasurableStat = new Rate()
private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
@ -425,10 +422,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -425,10 +422,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
sensorAccessor.getOrCreate(
sensorName,
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
lock, metrics,
() => metricName,
() => null,
() => measurableStat
metricName,
None,
new Rate
)
}

13
core/src/main/scala/kafka/server/ReplicationQuotaManager.scala

@ -74,8 +74,9 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, @@ -74,8 +74,9 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
private val lock = new ReentrantReadWriteLock()
private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
private var quota: Quota = null
private val sensorAccess = new SensorAccess
private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString, s"Tracking byte-rate for ${replicationType}")
private val sensorAccess = new SensorAccess(lock, metrics)
private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString,
s"Tracking byte-rate for ${replicationType}")
/**
* Update the quota
@ -194,11 +195,9 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, @@ -194,11 +195,9 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
sensorAccess.getOrCreate(
replicationType.toString,
InactiveSensorExpirationTimeSeconds,
lock,
metrics,
() => rateMetricName,
() => getQuotaMetricConfig(quota),
() => new SimpleRate()
rateMetricName,
Some(getQuotaMetricConfig(quota)),
new SimpleRate
)
}
}

21
core/src/main/scala/kafka/server/SensorAccess.scala

@ -16,10 +16,10 @@ @@ -16,10 +16,10 @@
*/
package kafka.server
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.concurrent.locks.ReadWriteLock
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, MetricConfig}
import org.apache.kafka.common.metrics.{MeasurableStat, MetricConfig, Metrics, Sensor}
/**
* Class which centralises the logic for creating/accessing sensors.
@ -27,9 +27,10 @@ import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, MetricC @@ -27,9 +27,10 @@ import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, MetricC
*
* The later arguments are passed as methods as they are only called when the sensor is instantiated.
*/
class SensorAccess {
class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
def getOrCreate(sensorName: String, expirationTime: Long, lock: ReentrantReadWriteLock, metrics: Metrics, metricName: () => MetricName, config: () => MetricConfig, measure: () => MeasurableStat): Sensor = {
def getOrCreate(sensorName: String, expirationTime: Long,
metricName: => MetricName, config: => Option[MetricConfig], measure: => MeasurableStat): Sensor = {
var sensor: Sensor = null
/* Acquire the read lock to fetch the sensor. It is safe to call getSensor from multiple threads.
@ -41,12 +42,8 @@ class SensorAccess { @@ -41,12 +42,8 @@ class SensorAccess {
* at which point it is safe to read
*/
lock.readLock().lock()
try {
sensor = metrics.getSensor(sensorName)
}
finally {
lock.readLock().unlock()
}
try sensor = metrics.getSensor(sensorName)
finally lock.readLock().unlock()
/* If the sensor is null, try to create it else return the existing sensor
* The sensor can be null, hence the null checks
@ -64,8 +61,8 @@ class SensorAccess { @@ -64,8 +61,8 @@ class SensorAccess {
// ensure that we initialise `ClientSensors` with non-null parameters.
sensor = metrics.getSensor(sensorName)
if (sensor == null) {
sensor = metrics.sensor(sensorName, config(), expirationTime)
sensor.add(metricName(), measure())
sensor = metrics.sensor(sensorName, config.orNull, expirationTime)
sensor.add(metricName, measure)
}
} finally {
lock.writeLock().unlock()

Loading…
Cancel
Save