Browse Source

KAFKA-5194; Include only client traffic in BytesOutPerSec metric (KIP-153)

Also added 2 new metrics to account for incoming/outgoing traffic due to internal replication
- ReplicationBytesInPerSec
- ReplicationBytesOutPerSec

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3003 from mimaison/KAFKA-5194
pull/2847/merge
Mickael Maison 8 years ago committed by Ismael Juma
parent
commit
d968662439
  1. 3
      core/src/main/scala/kafka/server/KafkaApis.scala
  2. 31
      core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  3. 1
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  4. 56
      core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
  5. 25
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

3
core/src/main/scala/kafka/server/KafkaApis.scala

@ -516,8 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -516,8 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchedPartitionData.put(topicPartition, data)
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.records.sizeInBytes)
BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
val response = new FetchResponse(fetchedPartitionData, 0)

31
core/src/main/scala/kafka/server/KafkaRequestHandler.scala

@ -115,6 +115,12 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { @@ -115,6 +115,12 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)
val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)
val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags)
private[server] val replicationBytesInRate =
if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags))
else None
private[server] val replicationBytesOutRate =
if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))
else None
val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags)
@ -125,6 +131,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { @@ -125,6 +131,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
removeMetric(BrokerTopicStats.BytesInPerSec, tags)
removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
@ -137,6 +145,8 @@ object BrokerTopicStats extends Logging { @@ -137,6 +145,8 @@ object BrokerTopicStats extends Logging {
val BytesInPerSec = "BytesInPerSec"
val BytesOutPerSec = "BytesOutPerSec"
val BytesRejectedPerSec = "BytesRejectedPerSec"
val ReplicationBytesInPerSec = "ReplicationBytesInPerSec"
val ReplicationBytesOutPerSec = "ReplicationBytesOutPerSec"
val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec"
val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
@ -152,9 +162,30 @@ object BrokerTopicStats extends Logging { @@ -152,9 +162,30 @@ object BrokerTopicStats extends Logging {
stats.getAndMaybePut(topic)
}
def updateReplicationBytesIn(value: Long) {
getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric =>
metric.mark(value)
}
}
private def updateReplicationBytesOut(value: Long) {
getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric =>
metric.mark(value)
}
}
def removeMetrics(topic: String) {
val metrics = stats.remove(topic)
if (metrics != null)
metrics.close()
}
def updateBytesOut(topic: String, isFollower: Boolean, value: Long) {
if (isFollower) {
updateReplicationBytesOut(value)
} else {
getBrokerTopicStats(topic).bytesOutRate.mark(value)
getBrokerAllTopicsStats.bytesOutRate.mark(value)
}
}
}

1
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -112,6 +112,7 @@ class ReplicaFetcherThread(name: String, @@ -112,6 +112,7 @@ class ReplicaFetcherThread(name: String,
trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
} catch {
case e: KafkaStorageException =>
fatal(s"Disk error while replicating data for $topicPartition", e)

56
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

@ -20,8 +20,8 @@ package kafka.metrics @@ -20,8 +20,8 @@ package kafka.metrics
import java.util.Properties
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Metric, MetricName, MetricPredicate}
import org.junit.{After, Test}
import com.yammer.metrics.core.{Meter, MetricPredicate}
import org.junit.Test
import org.junit.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
@ -48,11 +48,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -48,11 +48,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val nMessages = 2
@After
override def tearDown() {
super.tearDown()
}
@Test
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
def testMetricsLeak() {
@ -93,10 +88,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -93,10 +88,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
}
@Test
def testClusterIdMetric(): Unit ={
def testClusterIdMetric(): Unit = {
// Check if clusterId metric exists.
val metrics = Metrics.defaultRegistry().allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
}
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
@ -111,10 +106,51 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -111,10 +106,51 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
zkConsumerConnector1.shutdown()
}
@Test
def testBrokerTopicMetricsBytesInOut(): Unit = {
val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic"
createTopic(zkUtils, topic, 1, numNodes, servers)
// Produce a few messages to create the metrics
TestUtils.produceMessages(servers, topic, nMessages)
val initialReplicationBytesIn = meterCount(replicationBytesIn)
val initialReplicationBytesOut = meterCount(replicationBytesOut)
val initialBytesIn = meterCount(bytesIn)
val initialBytesOut = meterCount(bytesOut)
// Produce a few messages to make the metrics tick
TestUtils.produceMessages(servers, topic, nMessages)
assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn)
assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut)
assertTrue(meterCount(bytesIn) > initialBytesIn)
// BytesOut doesn't include replication, so it shouldn't have changed
assertEquals(initialBytesOut, meterCount(bytesOut))
// Consume messages to make bytesOut tick
TestUtils.consumeTopicRecords(servers, topic, nMessages * 2)
assertTrue(meterCount(bytesOut) > initialBytesOut)
}
private def meterCount(metricName: String): Long = {
Metrics.defaultRegistry.allMetrics.asScala
.filterKeys(_.getMBeanName.endsWith(metricName))
.values
.headOption
.getOrElse(fail(s"Unable to find metric $metricName"))
.asInstanceOf[Meter]
.count
}
private def checkTopicMetricsExists(topic: String): Boolean = {
val topicMetricRegex = new Regex(".*("+topic+")$")
val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
for(metricGroup <- metricGroups.asScala) {
for (metricGroup <- metricGroups.asScala) {
if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
return true
}

25
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -22,7 +22,7 @@ import java.nio._ @@ -22,7 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.charset.Charset
import java.security.cert.X509Certificate
import java.util.Properties
import java.util.{ArrayList, Collections, Properties}
import java.util.concurrent.{Callable, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
@ -40,7 +40,7 @@ import kafka.server._ @@ -40,7 +40,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.{ListenerName, Mode}
@ -1298,6 +1298,27 @@ object TestUtils extends Logging { @@ -1298,6 +1298,27 @@ object TestUtils extends Logging {
assertTrue(s"$message failed with exception(s) $exceptions", exceptions.isEmpty)
}
def consumeTopicRecords[K, V](servers: Seq[KafkaServer], topic: String, numMessages: Int,
waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
securityProtocol = SecurityProtocol.PLAINTEXT)
try {
consumer.subscribe(Collections.singleton(topic))
consumeRecords(consumer, numMessages, waitTime)
} finally consumer.close()
}
def consumeRecords[K, V](consumer: KafkaConsumer[K, V], numMessages: Int,
waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
val records = new ArrayBuffer[ConsumerRecord[K, V]]()
waitUntilTrue(() => {
records ++= consumer.poll(50).asScala
records.size >= numMessages
}, s"Consumed ${records.size} records until timeout instead of the expected $numMessages records", waitTime)
assertEquals("Consumed more records than expected", numMessages, records.size)
records
}
}
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {

Loading…
Cancel
Save