diff --git a/config/server.properties b/config/server.properties index 7f4fec282e1..b47fe946cc2 100644 --- a/config/server.properties +++ b/config/server.properties @@ -104,9 +104,6 @@ log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -# Enable connecting to zookeeper -enable.zookeeper=true - # Zk connection string (see zk docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index e5b670e1e01..355948ebeaf 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -33,10 +33,11 @@ object Kafka extends Logging { try { val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) - val metricsConfig = new KafkaMetricsConfig(props) + val verifiableProps = serverConfig.props + val metricsConfig = new KafkaMetricsConfig(verifiableProps) metricsConfig.reporters.foreach(reporterType => { val reporter = Utils.getObject[KafkaMetricsReporter](reporterType) - reporter.init(props) + reporter.init(verifiableProps) if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) }) diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 0cf69e2cfc8..3c190cf28ea 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -129,7 +129,7 @@ object ConsoleConsumer extends Logging { props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString) props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString) - props.put("auto.commit", "true") + props.put("autocommit.enable", "true") props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") props.put("zk.connect", options.valueOf(zkConnectOpt)) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 0410f47795f..3770e47f876 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -18,8 +18,9 @@ package kafka.consumer import java.util.Properties -import kafka.utils.{ZKConfig, Utils} import kafka.api.OffsetRequest +import kafka.utils.{VerifiableProperties, ZKConfig} + object ConsumerConfig { val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 @@ -43,62 +44,67 @@ object ConsumerConfig { val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" } -class ConsumerConfig(props: Properties) extends ZKConfig(props) { +class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) { import ConsumerConfig._ + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + props.verify() + } + /** a string that uniquely identifies a set of consumers within the same consumer group */ - val groupId = Utils.getString(props, "groupid") + val groupId = props.getString("groupid") /** consumer id: generated automatically if not set. * Set this explicitly for only testing purpose. */ - val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null)) + val consumerId: Option[String] = Option(props.getString("consumerid", null)) /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */ - val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout) + val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) /** the socket receive buffer for network requests */ - val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize) + val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize) /** the number of byes of messages to attempt to fetch */ - val fetchSize = Utils.getInt(props, "fetch.size", FetchSize) + val fetchSize = props.getInt("fetch.size", FetchSize) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ - val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit) + val autoCommit = props.getBoolean("autocommit.enable", AutoCommit) /** the frequency in ms that the consumer offsets are committed to zookeeper */ - val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval) + val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval) /** max number of messages buffered for consumption */ - val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks) + val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks) /** max number of retries during rebalance */ - val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries) + val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries) /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ - val minFetchBytes = Utils.getInt(props, "min.fetch.bytes", MinFetchBytes) + val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes) /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */ - val maxFetchWaitMs = Utils.getInt(props, "max.fetch.wait.ms", MaxFetchWaitMs) + val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs) /** backoff time between retries during rebalance */ - val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs) + val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs) /** backoff time to refresh the leader of a partition after it loses the current leader */ - val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200) + val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", 200) /* what to do if an offset is out of range. smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset anything else: throw exception to the consumer */ - val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset) + val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset) /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ - val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs) + val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs) /** Use shallow iterator over compressed messages directly. This feature should be used very carefully. * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the * overhead of decompression. * */ - val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false) + val enableShallowIterator = props.getBoolean("shallowiterator.enable", false) } diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala index d9e289ee06b..cfe7e34544c 100644 --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -20,12 +20,11 @@ package kafka.metrics -import java.util.Properties import com.yammer.metrics.Metrics import java.io.File import com.yammer.metrics.reporting.CsvReporter -import kafka.utils.{Logging, Utils} import java.util.concurrent.TimeUnit +import kafka.utils.{VerifiableProperties, Logging} private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean @@ -43,15 +42,15 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter override def getMBeanName = "kafka:type=kafka.metrics.KafkaCSVMetricsReporter" - override def init(props: Properties) { + override def init(props: VerifiableProperties) { synchronized { if (!initialized) { val metricsConfig = new KafkaMetricsConfig(props) - csvDir = new File(Utils.getString(props, "kafka.csv.metrics.dir", "kafka_metrics")) + csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics")) if (!csvDir.exists()) csvDir.mkdirs() underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) - if (Utils.getBoolean(props, "kafka.csv.metrics.reporter.enabled", false)) + if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false)) startReporter(metricsConfig.pollingIntervalSecs) initialized = true } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala index 771160c4371..35c4f22f87b 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -20,19 +20,18 @@ package kafka.metrics -import java.util.Properties -import kafka.utils.Utils +import kafka.utils.{VerifiableProperties, Utils} -class KafkaMetricsConfig(props: Properties) { +class KafkaMetricsConfig(props: VerifiableProperties) { /** * Comma-separated list of reporter types. These classes should be on the * classpath and will be instantiated at run-time. */ - val reporters = Utils.getCSVList(Utils.getString(props, "kafka.metrics.reporters", "")) + val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", "")) /** * The metrics polling interval (in seconds). */ - val pollingIntervalSecs = Utils.getInt(props, "kafka.metrics.polling.interval.secs", 10) + val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10) } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala index b2ad92064e9..57f27897eac 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -20,7 +20,7 @@ package kafka.metrics -import java.util.Properties +import kafka.utils.VerifiableProperties /** * Base trait for reporter MBeans. If a client wants to expose these JMX @@ -42,6 +42,6 @@ trait KafkaMetricsReporterMBean { trait KafkaMetricsReporter { - def init(props: Properties) + def init(props: VerifiableProperties) } diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index b2da93600b3..f974221e66c 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -19,9 +19,16 @@ package kafka.producer import async.AsyncProducerConfig import java.util.Properties -import kafka.utils.Utils +import kafka.utils.{Utils, VerifiableProperties} +import kafka.message.{CompressionCodec, NoCompressionCodec} -class ProducerConfig(val props: Properties) extends AsyncProducerConfig with SyncProducerConfigShared{ +class ProducerConfig private (val props: VerifiableProperties) + extends AsyncProducerConfig with SyncProducerConfigShared { + + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + props.verify() + } /** This is for bootstrapping and the producer will only use it for getting metadata * (topics, partitions and replicas). The socket connections for sending the actual data @@ -29,27 +36,27 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn * format is host1:por1,host2:port2, and the list can be a subset of brokers or * a VIP pointing to a subset of brokers. */ - val brokerList = Utils.getString(props, "broker.list") + val brokerList = props.getString("broker.list") /** * If DefaultEventHandler is used, this specifies the number of times to * retry if an error is encountered during send. */ - val numRetries = Utils.getInt(props, "num.retries", 0) + val numRetries = props.getInt("num.retries", 0) /** the partitioner class for partitioning events amongst sub-topics */ - val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner") + val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner") /** this parameter specifies whether the messages are sent asynchronously * * or not. Valid values are - async for asynchronous send * * sync for synchronous send */ - val producerType = Utils.getString(props, "producer.type", "sync") + val producerType = props.getString("producer.type", "sync") /** * This parameter allows you to specify the compression codec for all data generated * * by this producer. The default is NoCompressionCodec */ - val compressionCodec = Utils.getCompressionCodec(props, "compression.codec") + val compressionCodec = CompressionCodec.getCompressionCodec(props.getInt("compression.codec", NoCompressionCodec.codec)) /** This parameter allows you to set whether compression should be turned * * on for particular topics @@ -62,7 +69,7 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn * * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null)) + val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null)) /** * The producer using the zookeeper software load balancer maintains a ZK cache that gets @@ -72,7 +79,7 @@ class ProducerConfig(val props: Properties) extends AsyncProducerConfig with Syn * ZK cache needs to be updated. * This parameter specifies the number of times the producer attempts to refresh this ZK cache. */ - val producerRetries = Utils.getInt(props, "producer.num.retries", 3) + val producerRetries = props.getInt("producer.num.retries", 3) - val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 100) + val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100) } diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index d8339214a75..5e8dfbad998 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -37,7 +37,7 @@ object ProducerPool{ val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) - props.putAll(config.props) + props.putAll(config.props.props) new SyncProducer(new SyncProducerConfig(props)) } } diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index 1fdfa0584a7..f3119ff91d7 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -17,48 +17,53 @@ package kafka.producer -import kafka.utils.Utils import java.util.Properties +import kafka.utils.VerifiableProperties + +class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared { + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + // no need to verify the property since SyncProducerConfig is supposed to be used internally + } -class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared { /** the broker to which the producer sends events */ - val host = Utils.getString(props, "host") + val host = props.getString("host") /** the port on which the broker is running */ - val port = Utils.getInt(props, "port") + val port = props.getInt("port") } trait SyncProducerConfigShared { - val props: Properties + val props: VerifiableProperties - val bufferSize = Utils.getInt(props, "buffer.size", 100*1024) + val bufferSize = props.getInt("buffer.size", 100*1024) - val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000) + val connectTimeoutMs = props.getInt("connect.timeout.ms", 5000) - val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) + val reconnectInterval = props.getInt("reconnect.interval", 30000) /** negative reconnect time interval means disabling this time-based reconnect feature */ - var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10) + var reconnectTimeInterval = props.getInt("reconnect.time.interval.ms", 1000*1000*10) - val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) + val maxMessageSize = props.getInt("max.message.size", 1000000) /* the client application sending the producer requests */ - val correlationId = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId) + val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId) /* the client application sending the producer requests */ - val clientId = Utils.getString(props,"producer.request.client_id",SyncProducerConfig.DefaultClientId) + val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId) /* * The required acks of the producer requests - negative value means ack * after the replicas in ISR have caught up to the leader's offset * corresponding to this produce request. */ - val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks) + val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks) /* * The ack timeout of the producer requests. Value must be non-negative and non-zero */ - val requestTimeoutMs = Utils.getIntInRange(props,"producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs, + val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs, (1, Integer.MAX_VALUE)) } diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index ff15c7c56a7..55a9fab3c7d 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -16,17 +16,16 @@ */ package kafka.producer.async -import java.util.Properties -import kafka.utils.Utils +import kafka.utils.VerifiableProperties trait AsyncProducerConfig { - val props: Properties + val props: VerifiableProperties /* maximum time, in milliseconds, for buffering data on the producer queue */ - val queueTime = Utils.getInt(props, "queue.time", 5000) + val queueTime = props.getInt("queue.time", 5000) /** the maximum size of the blocking queue for buffering on the producer */ - val queueSize = Utils.getInt(props, "queue.size", 10000) + val queueSize = props.getInt("queue.size", 10000) /** * Timeout for event enqueue: @@ -34,11 +33,11 @@ trait AsyncProducerConfig { * -ve: enqueue will block indefinitely if the queue is full * +ve: enqueue will block up to this many milliseconds if the queue is full */ - val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0) + val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0) /** the number of messages batched at the producer */ - val batchSize = Utils.getInt(props, "batch.size", 200) + val batchSize = props.getInt("batch.size", 200) /** the serializer class for events */ - val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder") + val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder") } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 420337e60a1..feb7f2de2a9 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -18,142 +18,147 @@ package kafka.server import java.util.Properties -import kafka.utils.{Utils, ZKConfig} import kafka.message.Message import kafka.consumer.ConsumerConfig import java.net.InetAddress - - +import kafka.utils.{Utils, VerifiableProperties, ZKConfig} /** * Configuration settings for the kafka server */ -class KafkaConfig(props: Properties) extends ZKConfig(props) { +class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) { + + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + } + + def verify() = props.verify() + /* the port to listen and accept connections on */ - val port: Int = Utils.getInt(props, "port", 6667) + val port: Int = props.getInt("port", 6667) /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */ - val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress) + val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress) /* the broker id for this server */ - val brokerId: Int = Utils.getIntInRange(props, "brokerid", (0, Int.MaxValue)) + val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue)) /* the SO_SNDBUFF buffer of the socket sever sockets */ - val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024) + val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024) /* the SO_RCVBUFF buffer of the socket sever sockets */ - val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024) + val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024) /* the maximum number of bytes in a socket request */ - val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue)) + val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue)) /* the number of network threads that the server uses for handling network requests */ - val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue)) + val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue)) /* the number of io threads that the server uses for carrying out network requests */ - val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue)) + val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue)) /* the number of queued requests allowed before blocking the network threads */ - val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue)) + val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue)) /* the interval in which to measure performance statistics */ - val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue)) + val monitoringPeriodSecs = props.getIntInRange("monitoring.period.secs", 600, (1, Int.MaxValue)) /* the default number of log partitions per topic */ - val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue)) + val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) /* the directory in which the log data is kept */ - val logDir = Utils.getString(props, "log.dir") + val logDir = props.getString("log.dir") /* the maximum size of a single log file */ - val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) + val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) /* the maximum size of a single log file for some specific topic */ - val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", "")) + val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", "")) /* the maximum time before a new log segment is rolled out */ - val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue)) + val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) /* the number of hours before rolling out a new log segment for some specific topic */ - val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", "")) + val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", "")) /* the number of hours to keep a log file before deleting it */ - val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue)) + val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it for some specific topic*/ - val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", "")) + val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", "")) /* the maximum size of the log before deleting it */ - val logRetentionSize = Utils.getLong(props, "log.retention.size", -1) + val logRetentionSize = props.getLong("log.retention.size", -1) /* the maximum size of the log for some specific topic before deleting it */ - val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", "")) + val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", "")) /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ - val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue)) + val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue)) + val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", "")) + val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", "")) /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ - val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms", 3000) + val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000) /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ - val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate) + val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate) /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */ - val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", "")) + val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", "")) /* enable auto creation of topic on the server */ - val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true) + val autoCreateTopics = props.getBoolean("auto.create.topics", true) /** * Following properties are relevant to Kafka replication */ /* the socket timeout for controller-to-broker channels */ - val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000) + val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000) /* the buffer size for controller-to-broker-channels */ - val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10) + val controllerMessageQueueSize= props.getInt("controller.message.queue.size", 10) /* default replication factors for automatically created topics */ - val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1) + val defaultReplicationFactor = props.getInt("default.replication.factor", 1) /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during * leader election on all replicas minus the preferred replica */ - val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300) + val preferredReplicaWaitTime = props.getLong("preferred.replica.wait.time", 300) - val replicaMaxLagTimeMs = Utils.getLong(props, "replica.max.lag.time.ms", 10000) + val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000) - val replicaMaxLagBytes = Utils.getLong(props, "replica.max.lag.bytes", 4000) + val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000) /* size of the state change request queue in Zookeeper */ - val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000) + val stateChangeQSize = props.getInt("state.change.queue.size", 1000) /** * Config options relevant to a follower for a replica */ /** the socket timeout for network requests */ - val replicaSocketTimeoutMs = Utils.getInt(props, "replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) + val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) /** the socket receive buffer for network requests */ - val replicaSocketBufferSize = Utils.getInt(props, "replica.socket.buffersize", ConsumerConfig.SocketBufferSize) + val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize) /** the number of byes of messages to attempt to fetch */ - val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize) + val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize) /** max wait time for each fetcher request issued by follower replicas*/ - val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500) + val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500) /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ - val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096) + val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 4096) /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ - val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1) + val numReplicaFetchers = props.getInt("replica.fetchers", 1) } diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index 39ea154fa66..2121b75004e 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -32,6 +32,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { def startup() { try { server.startup() + serverConfig.verify() } catch { case e => diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 64645b1ff62..64d84ccee7d 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -33,10 +33,11 @@ import javax.management.ObjectName object Mx4jLoader extends Logging { def maybeLoad(): Boolean = { - if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false)) + val props = new VerifiableProperties(System.getProperties()) + if (props.getBoolean("kafka_mx4jenable", false)) false - val address = System.getProperty("mx4jaddress", "0.0.0.0") - val port = Utils.getInt(System.getProperties(), "mx4jport", 8082) + val address = props.getString("mx4jaddress", "0.0.0.0") + val port = props.getInt("mx4jport", 8082) try { debug("Will try to load MX4j now, if it's in the classpath"); diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 942cfacc3e1..fe0a08bc1a1 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -209,62 +209,6 @@ object Utils extends Logging { props.load(propStream) props } - - /** - * Read a required integer property value or throw an exception if no such property is found - */ - def getInt(props: Properties, name: String): Int = { - require(props.containsKey(name), "Missing required property '" + name + "'") - return getInt(props, name, -1) - } - - def getIntInRange(props: Properties, name: String, range: (Int, Int)): Int = { - require(props.containsKey(name), "Missing required property '" + name + "'") - getIntInRange(props, name, -1, range) - } - - /** - * Read an integer from the properties instance - * @param props The properties to read from - * @param name The property name - * @param default The default value to use if the property is not found - * @return the integer value - */ - def getInt(props: Properties, name: String, default: Int): Int = - getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) - - def getShort(props: Properties, name: String, default: Short): Short = - getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue)) - - /** - * Read an integer from the properties instance. Throw an exception - * if the value is not in the given range (inclusive) - * @param props The properties to read from - * @param name The property name - * @param default The default value to use if the property is not found - * @param range The range in which the value must fall (inclusive) - * @throws IllegalArgumentException If the value is not in the given range - * @return the integer value - */ - def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = { - val v = - if(props.containsKey(name)) - props.getProperty(name).toInt - else - default - require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".") - v - } - - def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = { - val v = - if(props.containsKey(name)) - props.getProperty(name).toShort - else - default - require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".") - v - } def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { val value = buffer.getInt @@ -287,115 +231,6 @@ object Utils extends Logging { else value } - /** - * Read a required long property value or throw an exception if no such property is found - */ - def getLong(props: Properties, name: String): Long = { - require(props.containsKey(name), "Missing required property '" + name + "'") - return getLong(props, name, -1) - } - - /** - * Read an long from the properties instance - * @param props The properties to read from - * @param name The property name - * @param default The default value to use if the property is not found - * @return the long value - */ - def getLong(props: Properties, name: String, default: Long): Long = - getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue)) - - /** - * Read an long from the properties instance. Throw an exception - * if the value is not in the given range (inclusive) - * @param props The properties to read from - * @param name The property name - * @param default The default value to use if the property is not found - * @param range The range in which the value must fall (inclusive) - * @throws IllegalArgumentException If the value is not in the given range - * @return the long value - */ - def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = { - val v = - if(props.containsKey(name)) - props.getProperty(name).toLong - else - default - require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".") - v - } - - /** - * Read a boolean value from the properties instance - * @param props The properties to read from - * @param name The property name - * @param default The default value to use if the property is not found - * @return the boolean value - */ - def getBoolean(props: Properties, name: String, default: Boolean): Boolean = { - if(!props.containsKey(name)) - default - else { - val v = props.getProperty(name) - require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false") - v.toBoolean - } - } - - /** - * Get a string property, or, if no such property is defined, return the given default value - */ - def getString(props: Properties, name: String, default: String): String = { - if(props.containsKey(name)) - props.getProperty(name) - else - default - } - - /** - * Get a string property or throw and exception if no such property is defined. - */ - def getString(props: Properties, name: String): String = { - require(props.containsKey(name), "Missing required property '" + name + "'") - props.getProperty(name) - } - - /** - * Get a property of type java.util.Properties or throw and exception if no such property is defined. - */ - def getProps(props: Properties, name: String): Properties = { - require(props.containsKey(name), "Missing required property '" + name + "'") - val propString = props.getProperty(name) - val propValues = propString.split(",") - val properties = new Properties - for(i <- 0 until propValues.length) { - val prop = propValues(i).split("=") - require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'") - properties.put(prop(0), prop(1)) - } - properties - } - - /** - * Get a property of type java.util.Properties or return the default if no such property is defined - */ - def getProps(props: Properties, name: String, default: Properties): Properties = { - if(props.containsKey(name)) { - val propString = props.getProperty(name) - val propValues = propString.split(",") - require(propValues.length >= 1, "Illegal format of specifying properties '" + propString + "'") - val properties = new Properties - for(i <- 0 until propValues.length) { - val prop = propValues(i).split("=") - require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'") - properties.put(prop(0), prop(1)) - } - properties - } - else - default - } - /** * Open a channel for the given file */ @@ -775,14 +610,6 @@ object Utils extends Logging { else true } - def getCompressionCodec(props: Properties, codec: String): CompressionCodec = { - val codecValueString = props.getProperty(codec) - if(codecValueString == null) - NoCompressionCodec - else - CompressionCodec.getCompressionCodec(codecValueString.toInt) - } - def tryCleanupZookeeper(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala new file mode 100644 index 00000000000..22aaba84836 --- /dev/null +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.util.Properties +import collection.mutable + +class VerifiableProperties(val props: Properties) extends Logging { + private val referenceSet = mutable.HashSet[String]() + + def containsKey(name: String): Boolean = { + props.containsKey(name) + } + + def getProperty(name: String): String = { + val value = props.getProperty(name) + referenceSet.add(name) + return value + } + + /** + * Read a required integer property value or throw an exception if no such property is found + */ + def getInt(name: String): Int = { + require(containsKey(name), "Missing required property '" + name + "'") + return getInt(name, -1) + } + + def getIntInRange(name: String, range: (Int, Int)): Int = { + require(containsKey(name), "Missing required property '" + name + "'") + getIntInRange(name, -1, range) + } + + /** + * Read an integer from the properties instance + * @param name The property name + * @param default The default value to use if the property is not found + * @return the integer value + */ + def getInt(name: String, default: Int): Int = + getIntInRange(name, default, (Int.MinValue, Int.MaxValue)) + + def getShort(name: String, default: Short): Short = + getShortInRange(name, default, (Short.MinValue, Short.MaxValue)) + + /** + * Read an integer from the properties instance. Throw an exception + * if the value is not in the given range (inclusive) + * @param name The property name + * @param default The default value to use if the property is not found + * @param range The range in which the value must fall (inclusive) + * @throws IllegalArgumentException If the value is not in the given range + * @return the integer value + */ + def getIntInRange(name: String, default: Int, range: (Int, Int)): Int = { + val v = + if(containsKey(name)) + getProperty(name).toInt + else + default + require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".") + v + } + + def getShortInRange(name: String, default: Short, range: (Short, Short)): Short = { + val v = + if(containsKey(name)) + getProperty(name).toShort + else + default + require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".") + v + } + + /** + * Read a required long property value or throw an exception if no such property is found + */ + def getLong(name: String): Long = { + require(containsKey(name), "Missing required property '" + name + "'") + return getLong(name, -1) + } + + /** + * Read an long from the properties instance + * @param name The property name + * @param default The default value to use if the property is not found + * @return the long value + */ + def getLong(name: String, default: Long): Long = + getLongInRange(name, default, (Long.MinValue, Long.MaxValue)) + + /** + * Read an long from the properties instance. Throw an exception + * if the value is not in the given range (inclusive) + * @param name The property name + * @param default The default value to use if the property is not found + * @param range The range in which the value must fall (inclusive) + * @throws IllegalArgumentException If the value is not in the given range + * @return the long value + */ + def getLongInRange(name: String, default: Long, range: (Long, Long)): Long = { + val v = + if(containsKey(name)) + getProperty(name).toLong + else + default + require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".") + v + } + + /** + * Read a boolean value from the properties instance + * @param name The property name + * @param default The default value to use if the property is not found + * @return the boolean value + */ + def getBoolean(name: String, default: Boolean): Boolean = { + if(!containsKey(name)) + default + else { + val v = getProperty(name) + require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false") + v.toBoolean + } + } + + /** + * Get a string property, or, if no such property is defined, return the given default value + */ + def getString(name: String, default: String): String = { + if(containsKey(name)) + getProperty(name) + else + default + } + + /** + * Get a string property or throw and exception if no such property is defined. + */ + def getString(name: String): String = { + require(containsKey(name), "Missing required property '" + name + "'") + getProperty(name) + } + + def verify() { + info("Verifying properties") + val specifiedProperties = props.propertyNames() + while (specifiedProperties.hasMoreElements) { + val key = specifiedProperties.nextElement().asInstanceOf[String] + if (!referenceSet.contains(key)) + warn("Property %s is not valid".format(key)) + else + info("Property %s is overridden to %s".format(key, props.getProperty(key))) + } + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 8029c5d5862..f9643d497f0 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,7 +17,6 @@ package kafka.utils -import java.util.Properties import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} @@ -600,16 +599,16 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) } -class ZKConfig(props: Properties) { +class ZKConfig(props: VerifiableProperties) { /** ZK host string */ - val zkConnect = Utils.getString(props, "zk.connect", null) + val zkConnect = props.getString("zk.connect", null) /** zookeeper session timeout */ - val zkSessionTimeoutMs = Utils.getInt(props, "zk.sessiontimeout.ms", 6000) + val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000) /** the max time that the client waits to establish a connection to zookeeper */ - val zkConnectionTimeoutMs = Utils.getInt(props, "zk.connectiontimeout.ms",zkSessionTimeoutMs) + val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs) /** how far a ZK follower can be behind a ZK leader */ - val zkSyncTimeMs = Utils.getInt(props, "zk.synctime.ms", 2000) + val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000) }