From b35e97125f291aa7dca22b8d897f9ab4b196fd37 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Sun, 30 Sep 2018 19:13:28 -0700 Subject: [PATCH] KAFKA-7459: Use thread-safe Pool for RequestMetrics.requestRateInternal (#5717) As part of KAFKA-6514, the `apiVersion` tag was added to the `RequestsPerSec` metric. A thread unsafe `HashMap` was used in the implementation even though it can be accessed by multiple threads. Fix it by replacing it with the thread-safe `Pool`. Reviewers: Ismael Juma --- core/src/main/scala/kafka/network/RequestChannel.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index e5aa5d90029..00b09688c5b 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -24,7 +24,7 @@ import java.util.concurrent._ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Gauge, Meter} import kafka.metrics.KafkaMetricsGroup -import kafka.utils.{Logging, NotNothing} +import kafka.utils.{Logging, NotNothing, Pool} import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -387,7 +387,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { import RequestMetrics._ val tags = Map("request" -> name) - val requestRateInternal = new mutable.HashMap[Short, Meter] + val requestRateInternal = new Pool[Short, Meter]() // time a request spent in a request queue val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags) // time a request takes to be processed at the local broker @@ -421,7 +421,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error))) def requestRate(version: Short): Meter = { - requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) + requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) } class ErrorMeter(name: String, error: Errors) { @@ -456,7 +456,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { } def removeMetrics(): Unit = { - for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString)) + for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec, tags + ("version" -> version.toString)) removeMetric(RequestQueueTimeMs, tags) removeMetric(LocalTimeMs, tags) removeMetric(RemoteTimeMs, tags)