Browse Source

KAFKA-4982; Add listener tags to socket-server-metrics (KIP-136)

Author: Edoardo Comar <ecomar@uk.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3004 from edoardocomar/KAFKA-4982
pull/3039/head
Edoardo Comar 8 years ago committed by Ismael Juma
parent
commit
f21f8f2d44
  1. 24
      core/src/main/scala/kafka/network/SocketServer.scala
  2. 21
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala

24
core/src/main/scala/kafka/network/SocketServer.scala

@ -21,7 +21,6 @@ import java.io.IOException @@ -21,7 +21,6 @@ import java.io.IOException
import java.net._
import java.nio.channels._
import java.nio.channels.{Selector => NSelector}
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic._
@ -34,7 +33,7 @@ import kafka.server.KafkaConfig @@ -34,7 +33,7 @@ import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Mode, Selectable, Selector => KSelector}
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Selector => KSelector}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.protocol.types.SchemaException
@ -68,12 +67,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -68,12 +67,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
private val allMetricNames = (0 until totalProcessorThreads).map { i =>
val tags = new util.HashMap[String, String]()
tags.put("networkProcessor", i.toString)
metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
}
/**
* Start the socket server
*/
@ -107,7 +100,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -107,7 +100,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = allMetricNames.map { metricName =>
private val ioWaitRatioMetricNames = processors.map { p =>
metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
}
def value = ioWaitRatioMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(_.value)
}.sum / totalProcessorThreads
}
@ -400,7 +397,10 @@ private[kafka] class Processor(val id: Int, @@ -400,7 +397,10 @@ private[kafka] class Processor(val id: Int,
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val metricTags = Map("networkProcessor" -> id.toString).asJava
private[kafka] val metricTags = mutable.LinkedHashMap(
"listener" -> listenerName.value,
"networkProcessor" -> id.toString
).asJava
newGauge("IdlePercent",
new Gauge[Double] {
@ -408,7 +408,9 @@ private[kafka] class Processor(val id: Int, @@ -408,7 +408,9 @@ private[kafka] class Processor(val id: Int,
Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
}
},
metricTags.asScala
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map("networkProcessor" -> id.toString)
)
private val selector = new KSelector(

21
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -419,4 +419,25 @@ class SocketServerTest extends JUnitSuite { @@ -419,4 +419,25 @@ class SocketServerTest extends JUnitSuite {
assertEquals(Map.empty, nonZeroMetricNamesAndValues)
}
@Test
def testProcessorMetricsTags(): Unit = {
val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty)
assertFalse(kafkaMetricNames.isEmpty)
val expectedListeners = Set("PLAINTEXT", "TRACE")
kafkaMetricNames.foreach { kafkaMetricName =>
assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener")))
}
// legacy metrics not tagged
val yammerMetricsNames = YammerMetrics.defaultRegistry.allMetrics.asScala
.filterKeys(_.getType.equals("Processor"))
.collect { case (k, _: Gauge[_]) => k }
assertFalse(yammerMetricsNames.isEmpty)
yammerMetricsNames.foreach { yammerMetricName =>
assertFalse(yammerMetricName.getMBeanName.contains("listener="))
}
}
}

Loading…
Cancel
Save