Browse Source

Log errors for unrecognized config options; patched by Jun Rao; reviewed by Jay Kreps; KAFKA-181

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1377220 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
a32d37f6d1
  1. 3
      config/server.properties
  2. 5
      core/src/main/scala/kafka/Kafka.scala
  3. 2
      core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
  4. 42
      core/src/main/scala/kafka/consumer/ConsumerConfig.scala
  5. 9
      core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
  6. 9
      core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
  7. 4
      core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
  8. 27
      core/src/main/scala/kafka/producer/ProducerConfig.scala
  9. 2
      core/src/main/scala/kafka/producer/ProducerPool.scala
  10. 33
      core/src/main/scala/kafka/producer/SyncProducerConfig.scala
  11. 15
      core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
  12. 93
      core/src/main/scala/kafka/server/KafkaConfig.scala
  13. 1
      core/src/main/scala/kafka/server/KafkaServerStartable.scala
  14. 7
      core/src/main/scala/kafka/utils/Mx4jLoader.scala
  15. 173
      core/src/main/scala/kafka/utils/Utils.scala
  16. 171
      core/src/main/scala/kafka/utils/VerifiableProperties.scala
  17. 11
      core/src/main/scala/kafka/utils/ZkUtils.scala

3
config/server.properties

@ -104,9 +104,6 @@ log.cleanup.interval.mins=1 @@ -104,9 +104,6 @@ log.cleanup.interval.mins=1
############################# Zookeeper #############################
# Enable connecting to zookeeper
enable.zookeeper=true
# Zk connection string (see zk docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

5
core/src/main/scala/kafka/Kafka.scala

@ -33,10 +33,11 @@ object Kafka extends Logging { @@ -33,10 +33,11 @@ object Kafka extends Logging {
try {
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)
val metricsConfig = new KafkaMetricsConfig(props)
val verifiableProps = serverConfig.props
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
metricsConfig.reporters.foreach(reporterType => {
val reporter = Utils.getObject[KafkaMetricsReporter](reporterType)
reporter.init(props)
reporter.init(verifiableProps)
if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
})

2
core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

@ -129,7 +129,7 @@ object ConsoleConsumer extends Logging { @@ -129,7 +129,7 @@ object ConsoleConsumer extends Logging {
props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
props.put("auto.commit", "true")
props.put("autocommit.enable", "true")
props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
props.put("zk.connect", options.valueOf(zkConnectOpt))

42
core/src/main/scala/kafka/consumer/ConsumerConfig.scala

@ -18,8 +18,9 @@ @@ -18,8 +18,9 @@
package kafka.consumer
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest
import kafka.utils.{VerifiableProperties, ZKConfig}
object ConsumerConfig {
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
@ -43,62 +44,67 @@ object ConsumerConfig { @@ -43,62 +44,67 @@ object ConsumerConfig {
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
}
class ConsumerConfig(props: Properties) extends ZKConfig(props) {
class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
import ConsumerConfig._
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
/** a string that uniquely identifies a set of consumers within the same consumer group */
val groupId = Utils.getString(props, "groupid")
val groupId = props.getString("groupid")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
val consumerId: Option[String] = Option(props.getString("consumerid", null))
/** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
val fetchSize = props.getInt("fetch.size", FetchSize)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
/** max number of messages buffered for consumption */
val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
/** max number of retries during rebalance */
val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val minFetchBytes = Utils.getInt(props, "min.fetch.bytes", MinFetchBytes)
val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
val maxFetchWaitMs = Utils.getInt(props, "max.fetch.wait.ms", MaxFetchWaitMs)
val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
/** backoff time to refresh the leader of a partition after it loses the current leader */
val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200)
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", 200)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
/** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
}

9
core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala

@ -20,12 +20,11 @@ @@ -20,12 +20,11 @@
package kafka.metrics
import java.util.Properties
import com.yammer.metrics.Metrics
import java.io.File
import com.yammer.metrics.reporting.CsvReporter
import kafka.utils.{Logging, Utils}
import java.util.concurrent.TimeUnit
import kafka.utils.{VerifiableProperties, Logging}
private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
@ -43,15 +42,15 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter @@ -43,15 +42,15 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
override def getMBeanName = "kafka:type=kafka.metrics.KafkaCSVMetricsReporter"
override def init(props: Properties) {
override def init(props: VerifiableProperties) {
synchronized {
if (!initialized) {
val metricsConfig = new KafkaMetricsConfig(props)
csvDir = new File(Utils.getString(props, "kafka.csv.metrics.dir", "kafka_metrics"))
csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
if (!csvDir.exists())
csvDir.mkdirs()
underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
if (Utils.getBoolean(props, "kafka.csv.metrics.reporter.enabled", false))
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
startReporter(metricsConfig.pollingIntervalSecs)
initialized = true
}

9
core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala

@ -20,19 +20,18 @@ @@ -20,19 +20,18 @@
package kafka.metrics
import java.util.Properties
import kafka.utils.Utils
import kafka.utils.{VerifiableProperties, Utils}
class KafkaMetricsConfig(props: Properties) {
class KafkaMetricsConfig(props: VerifiableProperties) {
/**
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
val reporters = Utils.getCSVList(Utils.getString(props, "kafka.metrics.reporters", ""))
val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", ""))
/**
* The metrics polling interval (in seconds).
*/
val pollingIntervalSecs = Utils.getInt(props, "kafka.metrics.polling.interval.secs", 10)
val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10)
}

4
core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala

@ -20,7 +20,7 @@ @@ -20,7 +20,7 @@
package kafka.metrics
import java.util.Properties
import kafka.utils.VerifiableProperties
/**
* Base trait for reporter MBeans. If a client wants to expose these JMX
@ -42,6 +42,6 @@ trait KafkaMetricsReporterMBean { @@ -42,6 +42,6 @@ trait KafkaMetricsReporterMBean {
trait KafkaMetricsReporter {
def init(props: Properties)
def init(props: VerifiableProperties)
}

27
core/src/main/scala/kafka/producer/ProducerConfig.scala

@ -19,9 +19,16 @@ package kafka.producer @@ -19,9 +19,16 @@ package kafka.producer
import async.AsyncProducerConfig
import java.util.Properties
import kafka.utils.Utils
import kafka.utils.{Utils, VerifiableProperties}
import kafka.message.{CompressionCodec, NoCompressionCodec}
class ProducerConfig(val props: Properties) extends AsyncProducerConfig with SyncProducerConfigShared{
class ProducerConfig private (val props: VerifiableProperties)
extends AsyncProducerConfig with SyncProducerConfigShared {
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
/** This is for bootstrapping and the producer will only use it for getting metadata
* (topics, partitions and replicas). The socket connections for sending the actual data
@ -29,27 +36,27 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn @@ -29,27 +36,27 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn
* format is host1:por1,host2:port2, and the list can be a subset of brokers or
* a VIP pointing to a subset of brokers.
*/
val brokerList = Utils.getString(props, "broker.list")
val brokerList = props.getString("broker.list")
/**
* If DefaultEventHandler is used, this specifies the number of times to
* retry if an error is encountered during send.
*/
val numRetries = Utils.getInt(props, "num.retries", 0)
val numRetries = props.getInt("num.retries", 0)
/** the partitioner class for partitioning events amongst sub-topics */
val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")
/** this parameter specifies whether the messages are sent asynchronously *
* or not. Valid values are - async for asynchronous send *
* sync for synchronous send */
val producerType = Utils.getString(props, "producer.type", "sync")
val producerType = props.getString("producer.type", "sync")
/**
* This parameter allows you to specify the compression codec for all data generated *
* by this producer. The default is NoCompressionCodec
*/
val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
val compressionCodec = CompressionCodec.getCompressionCodec(props.getInt("compression.codec", NoCompressionCodec.codec))
/** This parameter allows you to set whether compression should be turned *
* on for particular topics
@ -62,7 +69,7 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn @@ -62,7 +69,7 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets
@ -72,7 +79,7 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn @@ -72,7 +79,7 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
*/
val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
val producerRetries = props.getInt("producer.num.retries", 3)
val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 100)
val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
}

2
core/src/main/scala/kafka/producer/ProducerPool.scala

@ -37,7 +37,7 @@ object ProducerPool{ @@ -37,7 +37,7 @@ object ProducerPool{
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props)
props.putAll(config.props.props)
new SyncProducer(new SyncProducerConfig(props))
}
}

33
core/src/main/scala/kafka/producer/SyncProducerConfig.scala

@ -17,48 +17,53 @@ @@ -17,48 +17,53 @@
package kafka.producer
import kafka.utils.Utils
import java.util.Properties
import kafka.utils.VerifiableProperties
class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
// no need to verify the property since SyncProducerConfig is supposed to be used internally
}
class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
/** the broker to which the producer sends events */
val host = Utils.getString(props, "host")
val host = props.getString("host")
/** the port on which the broker is running */
val port = Utils.getInt(props, "port")
val port = props.getInt("port")
}
trait SyncProducerConfigShared {
val props: Properties
val props: VerifiableProperties
val bufferSize = Utils.getInt(props, "buffer.size", 100*1024)
val bufferSize = props.getInt("buffer.size", 100*1024)
val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
val connectTimeoutMs = props.getInt("connect.timeout.ms", 5000)
val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
val reconnectInterval = props.getInt("reconnect.interval", 30000)
/** negative reconnect time interval means disabling this time-based reconnect feature */
var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
var reconnectTimeInterval = props.getInt("reconnect.time.interval.ms", 1000*1000*10)
val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
val maxMessageSize = props.getInt("max.message.size", 1000000)
/* the client application sending the producer requests */
val correlationId = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
/* the client application sending the producer requests */
val clientId = Utils.getString(props,"producer.request.client_id",SyncProducerConfig.DefaultClientId)
val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId)
/*
* The required acks of the producer requests - negative value means ack
* after the replicas in ISR have caught up to the leader's offset
* corresponding to this produce request.
*/
val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
/*
* The ack timeout of the producer requests. Value must be non-negative and non-zero
*/
val requestTimeoutMs = Utils.getIntInRange(props,"producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
(1, Integer.MAX_VALUE))
}

15
core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala

@ -16,17 +16,16 @@ @@ -16,17 +16,16 @@
*/
package kafka.producer.async
import java.util.Properties
import kafka.utils.Utils
import kafka.utils.VerifiableProperties
trait AsyncProducerConfig {
val props: Properties
val props: VerifiableProperties
/* maximum time, in milliseconds, for buffering data on the producer queue */
val queueTime = Utils.getInt(props, "queue.time", 5000)
val queueTime = props.getInt("queue.time", 5000)
/** the maximum size of the blocking queue for buffering on the producer */
val queueSize = Utils.getInt(props, "queue.size", 10000)
val queueSize = props.getInt("queue.size", 10000)
/**
* Timeout for event enqueue:
@ -34,11 +33,11 @@ trait AsyncProducerConfig { @@ -34,11 +33,11 @@ trait AsyncProducerConfig {
* -ve: enqueue will block indefinitely if the queue is full
* +ve: enqueue will block up to this many milliseconds if the queue is full
*/
val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0)
val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0)
/** the number of messages batched at the producer */
val batchSize = Utils.getInt(props, "batch.size", 200)
val batchSize = props.getInt("batch.size", 200)
/** the serializer class for events */
val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder")
val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
}

93
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -18,142 +18,147 @@ @@ -18,142 +18,147 @@
package kafka.server
import java.util.Properties
import kafka.utils.{Utils, ZKConfig}
import kafka.message.Message
import kafka.consumer.ConsumerConfig
import java.net.InetAddress
import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
/**
* Configuration settings for the kafka server
*/
class KafkaConfig(props: Properties) extends ZKConfig(props) {
class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
}
def verify() = props.verify()
/* the port to listen and accept connections on */
val port: Int = Utils.getInt(props, "port", 6667)
val port: Int = props.getInt("port", 6667)
/* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress)
/* the broker id for this server */
val brokerId: Int = Utils.getIntInRange(props, "brokerid", (0, Int.MaxValue))
val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
/* the SO_RCVBUFF buffer of the socket sever sockets */
val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
/* the maximum number of bytes in a socket request */
val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
/* the number of io threads that the server uses for carrying out network requests */
val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
/* the number of queued requests allowed before blocking the network threads */
val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
/* the interval in which to measure performance statistics */
val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
val monitoringPeriodSecs = props.getIntInRange("monitoring.period.secs", 600, (1, Int.MaxValue))
/* the default number of log partitions per topic */
val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
/* the directory in which the log data is kept */
val logDir = Utils.getString(props, "log.dir")
val logDir = props.getString("log.dir")
/* the maximum size of a single log file */
val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", ""))
/* the maximum time before a new log segment is rolled out */
val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", ""))
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", ""))
/* the maximum size of the log before deleting it */
val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
val logRetentionSize = props.getLong("log.retention.size", -1)
/* the maximum size of the log for some specific topic before deleting it */
val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", ""))
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", ""))
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms", 3000)
val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", ""))
/* enable auto creation of topic on the server */
val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
val autoCreateTopics = props.getBoolean("auto.create.topics", true)
/**
* Following properties are relevant to Kafka replication
*/
/* the socket timeout for controller-to-broker channels */
val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000)
val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000)
/* the buffer size for controller-to-broker-channels */
val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10)
val controllerMessageQueueSize= props.getInt("controller.message.queue.size", 10)
/* default replication factors for automatically created topics */
val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
/* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
* leader election on all replicas minus the preferred replica */
val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
val preferredReplicaWaitTime = props.getLong("preferred.replica.wait.time", 300)
val replicaMaxLagTimeMs = Utils.getLong(props, "replica.max.lag.time.ms", 10000)
val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
val replicaMaxLagBytes = Utils.getLong(props, "replica.max.lag.bytes", 4000)
val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
/* size of the state change request queue in Zookeeper */
val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
val stateChangeQSize = props.getInt("state.change.queue.size", 1000)
/**
* Config options relevant to a follower for a replica
*/
/** the socket timeout for network requests */
val replicaSocketTimeoutMs = Utils.getInt(props, "replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
/** the socket receive buffer for network requests */
val replicaSocketBufferSize = Utils.getInt(props, "replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
/** max wait time for each fetcher request issued by follower replicas*/
val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
/** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096)
val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 4096)
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1)
val numReplicaFetchers = props.getInt("replica.fetchers", 1)
}

1
core/src/main/scala/kafka/server/KafkaServerStartable.scala

@ -32,6 +32,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { @@ -32,6 +32,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
def startup() {
try {
server.startup()
serverConfig.verify()
}
catch {
case e =>

7
core/src/main/scala/kafka/utils/Mx4jLoader.scala

@ -33,10 +33,11 @@ import javax.management.ObjectName @@ -33,10 +33,11 @@ import javax.management.ObjectName
object Mx4jLoader extends Logging {
def maybeLoad(): Boolean = {
if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false))
val props = new VerifiableProperties(System.getProperties())
if (props.getBoolean("kafka_mx4jenable", false))
false
val address = System.getProperty("mx4jaddress", "0.0.0.0")
val port = Utils.getInt(System.getProperties(), "mx4jport", 8082)
val address = props.getString("mx4jaddress", "0.0.0.0")
val port = props.getInt("mx4jport", 8082)
try {
debug("Will try to load MX4j now, if it's in the classpath");

173
core/src/main/scala/kafka/utils/Utils.scala

@ -209,62 +209,6 @@ object Utils extends Logging { @@ -209,62 +209,6 @@ object Utils extends Logging {
props.load(propStream)
props
}
/**
* Read a required integer property value or throw an exception if no such property is found
*/
def getInt(props: Properties, name: String): Int = {
require(props.containsKey(name), "Missing required property '" + name + "'")
return getInt(props, name, -1)
}
def getIntInRange(props: Properties, name: String, range: (Int, Int)): Int = {
require(props.containsKey(name), "Missing required property '" + name + "'")
getIntInRange(props, name, -1, range)
}
/**
* Read an integer from the properties instance
* @param props The properties to read from
* @param name The property name
* @param default The default value to use if the property is not found
* @return the integer value
*/
def getInt(props: Properties, name: String, default: Int): Int =
getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
def getShort(props: Properties, name: String, default: Short): Short =
getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue))
/**
* Read an integer from the properties instance. Throw an exception
* if the value is not in the given range (inclusive)
* @param props The properties to read from
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
* @throws IllegalArgumentException If the value is not in the given range
* @return the integer value
*/
def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
val v =
if(props.containsKey(name))
props.getProperty(name).toInt
else
default
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = {
val v =
if(props.containsKey(name))
props.getProperty(name).toShort
else
default
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
val value = buffer.getInt
@ -287,115 +231,6 @@ object Utils extends Logging { @@ -287,115 +231,6 @@ object Utils extends Logging {
else value
}
/**
* Read a required long property value or throw an exception if no such property is found
*/
def getLong(props: Properties, name: String): Long = {
require(props.containsKey(name), "Missing required property '" + name + "'")
return getLong(props, name, -1)
}
/**
* Read an long from the properties instance
* @param props The properties to read from
* @param name The property name
* @param default The default value to use if the property is not found
* @return the long value
*/
def getLong(props: Properties, name: String, default: Long): Long =
getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue))
/**
* Read an long from the properties instance. Throw an exception
* if the value is not in the given range (inclusive)
* @param props The properties to read from
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
* @throws IllegalArgumentException If the value is not in the given range
* @return the long value
*/
def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
val v =
if(props.containsKey(name))
props.getProperty(name).toLong
else
default
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
/**
* Read a boolean value from the properties instance
* @param props The properties to read from
* @param name The property name
* @param default The default value to use if the property is not found
* @return the boolean value
*/
def getBoolean(props: Properties, name: String, default: Boolean): Boolean = {
if(!props.containsKey(name))
default
else {
val v = props.getProperty(name)
require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
v.toBoolean
}
}
/**
* Get a string property, or, if no such property is defined, return the given default value
*/
def getString(props: Properties, name: String, default: String): String = {
if(props.containsKey(name))
props.getProperty(name)
else
default
}
/**
* Get a string property or throw and exception if no such property is defined.
*/
def getString(props: Properties, name: String): String = {
require(props.containsKey(name), "Missing required property '" + name + "'")
props.getProperty(name)
}
/**
* Get a property of type java.util.Properties or throw and exception if no such property is defined.
*/
def getProps(props: Properties, name: String): Properties = {
require(props.containsKey(name), "Missing required property '" + name + "'")
val propString = props.getProperty(name)
val propValues = propString.split(",")
val properties = new Properties
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
properties
}
/**
* Get a property of type java.util.Properties or return the default if no such property is defined
*/
def getProps(props: Properties, name: String, default: Properties): Properties = {
if(props.containsKey(name)) {
val propString = props.getProperty(name)
val propValues = propString.split(",")
require(propValues.length >= 1, "Illegal format of specifying properties '" + propString + "'")
val properties = new Properties
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
properties
}
else
default
}
/**
* Open a channel for the given file
*/
@ -775,14 +610,6 @@ object Utils extends Logging { @@ -775,14 +610,6 @@ object Utils extends Logging {
else true
}
def getCompressionCodec(props: Properties, codec: String): CompressionCodec = {
val codecValueString = props.getProperty(codec)
if(codecValueString == null)
NoCompressionCodec
else
CompressionCodec.getCompressionCodec(codecValueString.toInt)
}
def tryCleanupZookeeper(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId

171
core/src/main/scala/kafka/utils/VerifiableProperties.scala

@ -0,0 +1,171 @@ @@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.Properties
import collection.mutable
class VerifiableProperties(val props: Properties) extends Logging {
private val referenceSet = mutable.HashSet[String]()
def containsKey(name: String): Boolean = {
props.containsKey(name)
}
def getProperty(name: String): String = {
val value = props.getProperty(name)
referenceSet.add(name)
return value
}
/**
* Read a required integer property value or throw an exception if no such property is found
*/
def getInt(name: String): Int = {
require(containsKey(name), "Missing required property '" + name + "'")
return getInt(name, -1)
}
def getIntInRange(name: String, range: (Int, Int)): Int = {
require(containsKey(name), "Missing required property '" + name + "'")
getIntInRange(name, -1, range)
}
/**
* Read an integer from the properties instance
* @param name The property name
* @param default The default value to use if the property is not found
* @return the integer value
*/
def getInt(name: String, default: Int): Int =
getIntInRange(name, default, (Int.MinValue, Int.MaxValue))
def getShort(name: String, default: Short): Short =
getShortInRange(name, default, (Short.MinValue, Short.MaxValue))
/**
* Read an integer from the properties instance. Throw an exception
* if the value is not in the given range (inclusive)
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
* @throws IllegalArgumentException If the value is not in the given range
* @return the integer value
*/
def getIntInRange(name: String, default: Int, range: (Int, Int)): Int = {
val v =
if(containsKey(name))
getProperty(name).toInt
else
default
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
def getShortInRange(name: String, default: Short, range: (Short, Short)): Short = {
val v =
if(containsKey(name))
getProperty(name).toShort
else
default
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
/**
* Read a required long property value or throw an exception if no such property is found
*/
def getLong(name: String): Long = {
require(containsKey(name), "Missing required property '" + name + "'")
return getLong(name, -1)
}
/**
* Read an long from the properties instance
* @param name The property name
* @param default The default value to use if the property is not found
* @return the long value
*/
def getLong(name: String, default: Long): Long =
getLongInRange(name, default, (Long.MinValue, Long.MaxValue))
/**
* Read an long from the properties instance. Throw an exception
* if the value is not in the given range (inclusive)
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
* @throws IllegalArgumentException If the value is not in the given range
* @return the long value
*/
def getLongInRange(name: String, default: Long, range: (Long, Long)): Long = {
val v =
if(containsKey(name))
getProperty(name).toLong
else
default
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
/**
* Read a boolean value from the properties instance
* @param name The property name
* @param default The default value to use if the property is not found
* @return the boolean value
*/
def getBoolean(name: String, default: Boolean): Boolean = {
if(!containsKey(name))
default
else {
val v = getProperty(name)
require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
v.toBoolean
}
}
/**
* Get a string property, or, if no such property is defined, return the given default value
*/
def getString(name: String, default: String): String = {
if(containsKey(name))
getProperty(name)
else
default
}
/**
* Get a string property or throw and exception if no such property is defined.
*/
def getString(name: String): String = {
require(containsKey(name), "Missing required property '" + name + "'")
getProperty(name)
}
def verify() {
info("Verifying properties")
val specifiedProperties = props.propertyNames()
while (specifiedProperties.hasMoreElements) {
val key = specifiedProperties.nextElement().asInstanceOf[String]
if (!referenceSet.contains(key))
warn("Property %s is not valid".format(key))
else
info("Property %s is overridden to %s".format(key, props.getProperty(key)))
}
}
}

11
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package kafka.utils
import java.util.Properties
import kafka.cluster.{Broker, Cluster}
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
@ -600,16 +599,16 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) @@ -600,16 +599,16 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group)
}
class ZKConfig(props: Properties) {
class ZKConfig(props: VerifiableProperties) {
/** ZK host string */
val zkConnect = Utils.getString(props, "zk.connect", null)
val zkConnect = props.getString("zk.connect", null)
/** zookeeper session timeout */
val zkSessionTimeoutMs = Utils.getInt(props, "zk.sessiontimeout.ms", 6000)
val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000)
/** the max time that the client waits to establish a connection to zookeeper */
val zkConnectionTimeoutMs = Utils.getInt(props, "zk.connectiontimeout.ms",zkSessionTimeoutMs)
val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs)
/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = Utils.getInt(props, "zk.synctime.ms", 2000)
val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000)
}

Loading…
Cancel
Save