From aa70a7d02552585a83c62784b92b67ed8ae3a304 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 13 Aug 2014 13:08:57 -0700 Subject: [PATCH] kafka-1567; Metric memory leaking after closing the clients; patched by Jiangjie Qin; reviewed by Guozhang Wang and Jun Rao --- .../kafka/consumer/ConsumerTopicStats.scala | 4 + .../FetchRequestAndResponseStats.scala | 16 ++- .../consumer/ZookeeperConsumerConnector.scala | 37 +++--- .../kafka/metrics/KafkaMetricsGroup.scala | 120 +++++++++++++++++- .../main/scala/kafka/producer/Producer.scala | 13 +- .../kafka/producer/ProducerRequestStats.scala | 4 + .../scala/kafka/producer/ProducerStats.scala | 4 + .../kafka/producer/ProducerTopicStats.scala | 4 + 8 files changed, 175 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala index ff5f470f7aa..f63e6c59bb1 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry { def getConsumerTopicStat(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeConsumerTopicStat(clientId: String) { + globalStats.remove(clientId) + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 875eeeb73cb..5243f415288 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -17,10 +17,11 @@ package kafka.consumer -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import kafka.utils.Pool import java.util.concurrent.TimeUnit + import kafka.common.ClientIdAndBroker +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} +import kafka.utils.Pool class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) @@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry { def getFetchRequestAndResponseStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeConsumerFetchRequestAndResponseStats(clientId: String) { + val pattern = (clientId + "-ConsumerFetcherThread.*").r + val keys = globalStats.keys + for (key <- keys) { + pattern.findFirstIn(key) match { + case Some(_) => globalStats.remove(key) + case _ => + } + } + } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 65f518d47c7..acfd064bdba 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -17,28 +17,28 @@ package kafka.consumer +import java.net.InetAddress +import java.util.UUID import java.util.concurrent._ import java.util.concurrent.atomic._ -import locks.ReentrantLock -import collection._ +import java.util.concurrent.locks.ReentrantLock + +import com.yammer.metrics.core.Gauge +import kafka.api._ +import kafka.client.ClientUtils import kafka.cluster._ -import kafka.utils._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import java.net.InetAddress -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} -import org.apache.zookeeper.Watcher.Event.KeeperState -import java.util.UUID -import kafka.serializer._ -import kafka.utils.ZkUtils._ -import kafka.utils.Utils.inLock import kafka.common._ -import com.yammer.metrics.core.Gauge import kafka.metrics._ import kafka.network.BlockingChannel -import kafka.client.ClientUtils -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition +import kafka.serializer._ +import kafka.utils.Utils.inLock +import kafka.utils.ZkUtils._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState + +import scala.collection._ /** @@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { info("ZKConsumerConnector shutting down") - + val startTime = System.nanoTime() + KafkaMetricsGroup.removeAllConsumerMetrics(config.clientId) rebalanceLock synchronized { if (wildcardTopicWatcher != null) wildcardTopicWatcher.shutdown() @@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case e: Throwable => fatal("error during consumer connector shutdown", e) } - info("ZKConsumerConnector shut down completed") + info("ZKConsumerConnector shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index a20ab90165c..00df4621fd7 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -18,10 +18,15 @@ package kafka.metrics -import com.yammer.metrics.core.{Gauge, MetricName} -import kafka.utils.Logging import java.util.concurrent.TimeUnit + import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Gauge, MetricName} +import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry} +import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry} +import kafka.utils.Logging + +import scala.collection.immutable trait KafkaMetricsGroup extends Logging { @@ -51,4 +56,115 @@ trait KafkaMetricsGroup extends Logging { def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + def removeMetric(name: String) = + Metrics.defaultRegistry().removeMetric(metricName(name)) + +} + +object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { + /** + * To make sure all the metrics be de-registered after consumer/producer close, the metric names should be + * put into the metric name set. + */ + private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName]( + // kafka.consumer.ZookeeperConsumerConnector + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"), + + // kafka.consumer.ConsumerFetcherManager + new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"), + new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"), + + // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread + new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"), + + // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo} + new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"), + new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"), + + // kafka.consumer.ConsumerTopicStats + new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"), + new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"), + + // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread + new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"), + new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"), + + // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer + new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"), + new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"), + new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"), + new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"), + + /** + * ProducerRequestStats <-- SyncProducer + * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed. + */ + new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize") + ) + + private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] ( + // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer + new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"), + new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"), + new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"), + + // kafka.producer.ProducerSendThread + new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"), + + // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler} + new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"), + + // kafka.producer.ProducerRequestStats <-- SyncProducer + new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize") + ) + + def removeAllConsumerMetrics(clientId: String) { + FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId) + ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId) + ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) + removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId) + } + + def removeAllProducerMetrics(clientId: String) { + ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) + ProducerTopicStatsRegistry.removeProducerTopicStats(clientId) + ProducerStatsRegistry.removeProducerStats(clientId) + removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId) + } + + private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) { + metricNameList.foreach(metric => { + val pattern = (clientId + ".*" + metric.getName +".*").r + val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet()) + for (registeredMetric <- registeredMetrics) { + if (registeredMetric.getGroup == metric.getGroup && + registeredMetric.getType == metric.getType) { + pattern.findFirstIn(registeredMetric.getName) match { + case Some(_) => { + val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size + Metrics.defaultRegistry().removeMetric(registeredMetric) + val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size + trace("Removing metric %s. Metrics registry size reduced from %d to %d".format( + registeredMetric, beforeRemovalSize, afterRemovalSize)) + } + case _ => + } + } + } + }) + } } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481d573..cd634f653ca 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -16,14 +16,14 @@ */ package kafka.producer -import async.{DefaultEventHandler, ProducerSendThread, EventHandler} -import kafka.utils._ -import java.util.Random -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + import kafka.common.QueueFullException import kafka.metrics._ +import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} +import kafka.serializer.Encoder +import kafka.utils._ class Producer[K,V](val config: ProducerConfig, @@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig, val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { info("Shutting down producer") + val startTime = System.nanoTime() + KafkaMetricsGroup.removeAllProducerMetrics(config.clientId) if (producerSendThread != null) producerSendThread.shutdown eventHandler.close + info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 96942205a6a..1c46d729d82 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry { def getProducerRequestStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeProducerRequestStats(clientId: String) { + globalStats.remove(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index e1610d3c602..35e3aae2f81 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -36,4 +36,8 @@ object ProducerStatsRegistry { def getProducerStats(clientId: String) = { statsRegistry.getAndMaybePut(clientId) } + + def removeProducerStats(clientId: String) { + statsRegistry.remove(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index ed209f4773d..9bb1419dcc4 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry { def getProducerTopicStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeProducerTopicStats(clientId: String) { + globalStats.remove(clientId) + } }