Browse Source

kafka-1567; Metric memory leaking after closing the clients; patched by Jiangjie Qin; reviewed by Guozhang Wang and Jun Rao

pull/30/head
Jiangjie Qin 10 years ago committed by Jun Rao
parent
commit
aa70a7d025
  1. 4
      core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
  2. 16
      core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
  3. 37
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  4. 120
      core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
  5. 13
      core/src/main/scala/kafka/producer/Producer.scala
  6. 4
      core/src/main/scala/kafka/producer/ProducerRequestStats.scala
  7. 4
      core/src/main/scala/kafka/producer/ProducerStats.scala
  8. 4
      core/src/main/scala/kafka/producer/ProducerTopicStats.scala

4
core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala

@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry { @@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry {
def getConsumerTopicStat(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
def removeConsumerTopicStat(clientId: String) {
globalStats.remove(clientId)
}
}

16
core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala

@ -17,10 +17,11 @@ @@ -17,10 +17,11 @@
package kafka.consumer
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.utils.Pool
import java.util.concurrent.TimeUnit
import kafka.common.ClientIdAndBroker
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.Pool
class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry { @@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry {
def getFetchRequestAndResponseStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
def removeConsumerFetchRequestAndResponseStats(clientId: String) {
val pattern = (clientId + "-ConsumerFetcherThread.*").r
val keys = globalStats.keys
for (key <- keys) {
pattern.findFirstIn(key) match {
case Some(_) => globalStats.remove(key)
case _ =>
}
}
}
}

37
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -17,28 +17,28 @@ @@ -17,28 +17,28 @@
package kafka.consumer
import java.net.InetAddress
import java.util.UUID
import java.util.concurrent._
import java.util.concurrent.atomic._
import locks.ReentrantLock
import collection._
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.client.ClientUtils
import kafka.cluster._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.net.InetAddress
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import java.util.UUID
import kafka.serializer._
import kafka.utils.ZkUtils._
import kafka.utils.Utils.inLock
import kafka.common._
import com.yammer.metrics.core.Gauge
import kafka.metrics._
import kafka.network.BlockingChannel
import kafka.client.ClientUtils
import kafka.api._
import scala.Some
import kafka.common.TopicAndPartition
import kafka.serializer._
import kafka.utils.Utils.inLock
import kafka.utils.ZkUtils._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
/**
@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
info("ZKConsumerConnector shutting down")
val startTime = System.nanoTime()
KafkaMetricsGroup.removeAllConsumerMetrics(config.clientId)
rebalanceLock synchronized {
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case e: Throwable =>
fatal("error during consumer connector shutdown", e)
}
info("ZKConsumerConnector shut down completed")
info("ZKConsumerConnector shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
}

120
core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala

@ -18,10 +18,15 @@ @@ -18,10 +18,15 @@
package kafka.metrics
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.utils.Logging
import java.util.concurrent.TimeUnit
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
import kafka.utils.Logging
import scala.collection.immutable
trait KafkaMetricsGroup extends Logging {
@ -51,4 +56,115 @@ trait KafkaMetricsGroup extends Logging { @@ -51,4 +56,115 @@ trait KafkaMetricsGroup extends Logging {
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
def removeMetric(name: String) =
Metrics.defaultRegistry().removeMetric(metricName(name))
}
object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
/**
* To make sure all the metrics be de-registered after consumer/producer close, the metric names should be
* put into the metric name set.
*/
private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
// kafka.consumer.ZookeeperConsumerConnector
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
// kafka.consumer.ConsumerFetcherManager
new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
// kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
// kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
// kafka.consumer.ConsumerTopicStats
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
// kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
// kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
/**
* ProducerRequestStats <-- SyncProducer
* metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
*/
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
)
private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
// kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
// kafka.producer.ProducerSendThread
new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
// kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
// kafka.producer.ProducerRequestStats <-- SyncProducer
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
)
def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
}
def removeAllProducerMetrics(clientId: String) {
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
ProducerStatsRegistry.removeProducerStats(clientId)
removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId)
}
private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
metricNameList.foreach(metric => {
val pattern = (clientId + ".*" + metric.getName +".*").r
val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
for (registeredMetric <- registeredMetrics) {
if (registeredMetric.getGroup == metric.getGroup &&
registeredMetric.getType == metric.getType) {
pattern.findFirstIn(registeredMetric.getName) match {
case Some(_) => {
val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
Metrics.defaultRegistry().removeMetric(registeredMetric)
val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
registeredMetric, beforeRemovalSize, afterRemovalSize))
}
case _ =>
}
}
}
})
}
}

13
core/src/main/scala/kafka/producer/Producer.scala

@ -16,14 +16,14 @@ @@ -16,14 +16,14 @@
*/
package kafka.producer
import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
import kafka.utils._
import java.util.Random
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import kafka.common.QueueFullException
import kafka.metrics._
import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
import kafka.serializer.Encoder
import kafka.utils._
class Producer[K,V](val config: ProducerConfig,
@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig, @@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig,
val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) {
info("Shutting down producer")
val startTime = System.nanoTime()
KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
if (producerSendThread != null)
producerSendThread.shutdown
eventHandler.close
info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
}

4
core/src/main/scala/kafka/producer/ProducerRequestStats.scala

@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry { @@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry {
def getProducerRequestStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
def removeProducerRequestStats(clientId: String) {
globalStats.remove(clientId)
}
}

4
core/src/main/scala/kafka/producer/ProducerStats.scala

@ -36,4 +36,8 @@ object ProducerStatsRegistry { @@ -36,4 +36,8 @@ object ProducerStatsRegistry {
def getProducerStats(clientId: String) = {
statsRegistry.getAndMaybePut(clientId)
}
def removeProducerStats(clientId: String) {
statsRegistry.remove(clientId)
}
}

4
core/src/main/scala/kafka/producer/ProducerTopicStats.scala

@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry { @@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry {
def getProducerTopicStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
def removeProducerTopicStats(clientId: String) {
globalStats.remove(clientId)
}
}

Loading…
Cancel
Save