Browse Source

KAFKA-6853: ZooKeeperRequestLatencyMs is incorrect (#4961)

ResponseMetadata.responseTimeMs is always 0 or negative.

Reviewers: Rajini Sivaram <rajinisivaram@gmail.com>, Ismael Juma <ismael@juma.me.uk>
pull/4964/head
Fedor Bobin 7 years ago committed by Ismael Juma
parent
commit
de147837dd
  1. 2
      core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
  2. 14
      core/src/test/scala/integration/kafka/api/MetricsTest.scala

2
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala

@ -476,7 +476,7 @@ sealed abstract class AsyncResponse { @@ -476,7 +476,7 @@ sealed abstract class AsyncResponse {
}
case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) {
def responseTimeMs: Long = sendTimeMs - receivedTimeMs
def responseTimeMs: Long = receivedTimeMs - sendTimeMs
}
case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String, metadata: ResponseMetadata) extends AsyncResponse

14
core/src/test/scala/integration/kafka/api/MetricsTest.scala

@ -217,12 +217,16 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -217,12 +217,16 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
// Latency is rounded to milliseconds, so check the count instead.
val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
val histogram = yammerHistogram("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
// Latency is rounded to milliseconds, so check the count instead
val initialCount = histogram.count
servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
val newCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
val newCount = histogram.count
assertTrue("ZooKeeper latency not recorded", newCount > initialCount)
val min = histogram.min
assertTrue(s"Min latency should not be negative: $min", min >= 0)
assertEquals(s"Unexpected ZK state", "CONNECTED", yammerMetricValue("SessionState"))
}
@ -286,12 +290,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -286,12 +290,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
}
private def yammerHistogramCount(name: String): Long = {
private def yammerHistogram(name: String): Histogram = {
val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
metric match {
case m: Histogram => m.count
case m: Histogram => m
case m => fail(s"Unexpected broker metric of class ${m.getClass}")
}
}

Loading…
Cancel
Save