Browse Source

KAFKA-7459: Use thread-safe Pool for RequestMetrics.requestRateInternal (#5717)

As part of KAFKA-6514, the `apiVersion` tag was added to the `RequestsPerSec`
metric. A thread unsafe `HashMap` was used in the implementation even though
it can be accessed by multiple threads. Fix it by replacing it with the thread-safe
`Pool`.

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/5718/merge
Zhanxiang (Patrick) Huang 6 years ago committed by Ismael Juma
parent
commit
b35e97125f
  1. 8
      core/src/main/scala/kafka/network/RequestChannel.scala

8
core/src/main/scala/kafka/network/RequestChannel.scala

@ -24,7 +24,7 @@ import java.util.concurrent._ @@ -24,7 +24,7 @@ import java.util.concurrent._
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, NotNothing}
import kafka.utils.{Logging, NotNothing, Pool}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -387,7 +387,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { @@ -387,7 +387,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
import RequestMetrics._
val tags = Map("request" -> name)
val requestRateInternal = new mutable.HashMap[Short, Meter]
val requestRateInternal = new Pool[Short, Meter]()
// time a request spent in a request queue
val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
// time a request takes to be processed at the local broker
@ -421,7 +421,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { @@ -421,7 +421,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
def requestRate(version: Short): Meter = {
requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
}
class ErrorMeter(name: String, error: Errors) {
@ -456,7 +456,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { @@ -456,7 +456,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
}
def removeMetrics(): Unit = {
for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
removeMetric(RequestQueueTimeMs, tags)
removeMetric(LocalTimeMs, tags)
removeMetric(RemoteTimeMs, tags)

Loading…
Cancel
Save