Browse Source

KAFKA-5066; Add KafkaMetricsConfig (Yammer metrics reporters) props to documentation

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Dong Lin <lindong28@gmail.com>

Closes #5563 from omkreddy/KAFKA-5066-KAFKA-METRICS-CONFIG
pull/5688/merge
Manikumar Reddy 6 years ago committed by Dong Lin
parent
commit
c835331c78
  1. 9
      core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
  2. 24
      core/src/main/scala/kafka/server/KafkaConfig.scala
  3. 5
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  4. 2
      core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala

9
core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala

@ -20,7 +20,8 @@ @@ -20,7 +20,8 @@
package kafka.metrics
import kafka.utils.{VerifiableProperties, CoreUtils}
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{CoreUtils, VerifiableProperties}
class KafkaMetricsConfig(props: VerifiableProperties) {
@ -28,10 +29,12 @@ class KafkaMetricsConfig(props: VerifiableProperties) { @@ -28,10 +29,12 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
val reporters = CoreUtils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
val reporters = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
Defaults.KafkaMetricReporterClasses))
/**
* The metrics polling interval (in seconds).
*/
val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10)
val pollingIntervalSecs = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
Defaults.KafkaMetricsPollingIntervalSeconds)
}

24
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -203,6 +203,11 @@ object Defaults { @@ -203,6 +203,11 @@ object Defaults {
val MetricReporterClasses = ""
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
/** ********* Kafka Yammer Metrics Reporter Configuration ***********/
val KafkaMetricReporterClasses = ""
val KafkaMetricsPollingIntervalSeconds = 10
/** ********* SSL configuration ***********/
val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
@ -410,6 +415,10 @@ object KafkaConfig { @@ -410,6 +415,10 @@ object KafkaConfig {
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
/** ********* Kafka Yammer Metrics Reporters Configuration ***********/
val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
val KafkaMetricsPollingIntervalSecondsProp = "kafka.metrics.polling.interval.secs"
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
@ -720,6 +729,17 @@ object KafkaConfig { @@ -720,6 +729,17 @@ object KafkaConfig {
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
/** ********* Kafka Yammer Metrics Reporter Configuration ***********/
val KafkaMetricsReporterClassesDoc = "A list of classes to use as Yammer metrics custom reporters." +
" The reporters should implement <code>kafka.metrics.KafkaMetricsReporter</code> trait. If a client wants" +
" to expose JMX operations on a custom reporter, the custom reporter needs to additionally implement an MBean" +
" trait that extends <code>kafka.metrics.KafkaMetricsReporterMBean</code> trait so that the registered MBean is compliant with" +
" the standard MBean convention."
val KafkaMetricsPollingIntervalSecondsDoc = s"The metrics polling interval (in seconds) which can be used" +
s" in $KafkaMetricsReporterClassesProp implementations."
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
@ -945,6 +965,10 @@ object KafkaConfig { @@ -945,6 +965,10 @@ object KafkaConfig {
.define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
.define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
/** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
.define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
.define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc)
/** ********* Quota configuration ***********/
.define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
.define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)

5
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -22,6 +22,7 @@ import java.util.Properties @@ -22,6 +22,7 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_8_2}
import kafka.cluster.EndPoint
import kafka.message._
import kafka.metrics.KafkaMetricsConfig
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.Sensor
@ -724,6 +725,10 @@ class KafkaConfigTest { @@ -724,6 +725,10 @@ class KafkaConfigTest {
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
//Kafka Yammer metrics reporter configs
case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})

2
core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala

@ -83,7 +83,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { @@ -83,7 +83,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(1, zkConnect)
props.setProperty("kafka.metrics.reporters", "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
props.setProperty(KafkaConfig.KafkaMetricsReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
props.setProperty(KafkaConfig.BrokerIdProp, "-1")

Loading…
Cancel
Save