diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index e5aa5d90029..00b09688c5b 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -24,7 +24,7 @@ import java.util.concurrent._ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Gauge, Meter} import kafka.metrics.KafkaMetricsGroup -import kafka.utils.{Logging, NotNothing} +import kafka.utils.{Logging, NotNothing, Pool} import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -387,7 +387,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { import RequestMetrics._ val tags = Map("request" -> name) - val requestRateInternal = new mutable.HashMap[Short, Meter] + val requestRateInternal = new Pool[Short, Meter]() // time a request spent in a request queue val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags) // time a request takes to be processed at the local broker @@ -421,7 +421,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error))) def requestRate(version: Short): Meter = { - requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) + requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) } class ErrorMeter(name: String, error: Errors) { @@ -456,7 +456,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { } def removeMetrics(): Unit = { - for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString)) + for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec, tags + ("version" -> version.toString)) removeMetric(RequestQueueTimeMs, tags) removeMetric(LocalTimeMs, tags) removeMetric(RemoteTimeMs, tags)