diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
index ad9eb20f3e7..a3a21bdbca0 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
@@ -20,7 +20,8 @@
package kafka.metrics
-import kafka.utils.{VerifiableProperties, CoreUtils}
+import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.{CoreUtils, VerifiableProperties}
class KafkaMetricsConfig(props: VerifiableProperties) {
@@ -28,10 +29,12 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
- val reporters = CoreUtils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
+ val reporters = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
+ Defaults.KafkaMetricReporterClasses))
/**
* The metrics polling interval (in seconds).
*/
- val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10)
+ val pollingIntervalSecs = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
+ Defaults.KafkaMetricsPollingIntervalSeconds)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1bc9707fa0b..90225024bfc 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -203,6 +203,11 @@ object Defaults {
val MetricReporterClasses = ""
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
+
+ /** ********* Kafka Yammer Metrics Reporter Configuration ***********/
+ val KafkaMetricReporterClasses = ""
+ val KafkaMetricsPollingIntervalSeconds = 10
+
/** ********* SSL configuration ***********/
val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
@@ -410,6 +415,10 @@ object KafkaConfig {
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
+ /** ********* Kafka Yammer Metrics Reporters Configuration ***********/
+ val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
+ val KafkaMetricsPollingIntervalSecondsProp = "kafka.metrics.polling.interval.secs"
+
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
@@ -720,6 +729,17 @@ object KafkaConfig {
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
+
+ /** ********* Kafka Yammer Metrics Reporter Configuration ***********/
+ val KafkaMetricsReporterClassesDoc = "A list of classes to use as Yammer metrics custom reporters." +
+ " The reporters should implement kafka.metrics.KafkaMetricsReporter
trait. If a client wants" +
+ " to expose JMX operations on a custom reporter, the custom reporter needs to additionally implement an MBean" +
+ " trait that extends kafka.metrics.KafkaMetricsReporterMBean
trait so that the registered MBean is compliant with" +
+ " the standard MBean convention."
+
+ val KafkaMetricsPollingIntervalSecondsDoc = s"The metrics polling interval (in seconds) which can be used" +
+ s" in $KafkaMetricsReporterClassesProp implementations."
+
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
@@ -945,6 +965,10 @@ object KafkaConfig {
.define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
.define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
+ /** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
+ .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
+ .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc)
+
/** ********* Quota configuration ***********/
.define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
.define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b7a8951ecc0..b75c3e7eb24 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_8_2}
import kafka.cluster.EndPoint
import kafka.message._
+import kafka.metrics.KafkaMetricsConfig
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.Sensor
@@ -724,6 +725,10 @@ class KafkaConfigTest {
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ //Kafka Yammer metrics reporter configs
+ case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
+ case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
+
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index dfcb4ac2799..4d514a0550d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -83,7 +83,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(1, zkConnect)
- props.setProperty("kafka.metrics.reporters", "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
+ props.setProperty(KafkaConfig.KafkaMetricsReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
props.setProperty(KafkaConfig.BrokerIdProp, "-1")