Browse Source

enforce broker.id to be a non-negative integer; patched by Swapnil Ghike; reviewed by Jun Rao, Neha Narkhede; KAFKA-424

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1374440 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
60735b7ad6
  1. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala
  2. 86
      core/src/main/scala/kafka/utils/Utils.scala
  3. 2
      core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -36,8 +36,8 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { @@ -36,8 +36,8 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
/* the broker id for this server */
val brokerId: Int = Utils.getInt(props, "brokerid")
val brokerId: Int = Utils.getIntInRange(props, "brokerid", (0, Int.MaxValue))
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)

86
core/src/main/scala/kafka/utils/Utils.scala

@ -213,12 +213,15 @@ object Utils extends Logging { @@ -213,12 +213,15 @@ object Utils extends Logging {
* Read a required integer property value or throw an exception if no such property is found
*/
def getInt(props: Properties, name: String): Int = {
if(props.containsKey(name))
return getInt(props, name, -1)
else
throw new KafkaException("Missing required property '" + name + "'")
require(props.containsKey(name), "Missing required property '" + name + "'")
return getInt(props, name, -1)
}
def getIntInRange(props: Properties, name: String, range: (Int, Int)): Int = {
require(props.containsKey(name), "Missing required property '" + name + "'")
getIntInRange(props, name, -1, range)
}
/**
* Read an integer from the properties instance
* @param props The properties to read from
@ -239,7 +242,7 @@ object Utils extends Logging { @@ -239,7 +242,7 @@ object Utils extends Logging {
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
* @throws KafkaException If the value is not in the given range
* @throws IllegalArgumentException If the value is not in the given range
* @return the integer value
*/
def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
@ -248,10 +251,8 @@ object Utils extends Logging { @@ -248,10 +251,8 @@ object Utils extends Logging {
props.getProperty(name).toInt
else
default
if(v < range._1 || v > range._2)
throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = {
@ -260,10 +261,8 @@ object Utils extends Logging { @@ -260,10 +261,8 @@ object Utils extends Logging {
props.getProperty(name).toShort
else
default
if(v < range._1 || v > range._2)
throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
@ -291,10 +290,8 @@ object Utils extends Logging { @@ -291,10 +290,8 @@ object Utils extends Logging {
* Read a required long property value or throw an exception if no such property is found
*/
def getLong(props: Properties, name: String): Long = {
if(props.containsKey(name))
return getLong(props, name, -1)
else
throw new KafkaException("Missing required property '" + name + "'")
require(props.containsKey(name), "Missing required property '" + name + "'")
return getLong(props, name, -1)
}
/**
@ -314,7 +311,7 @@ object Utils extends Logging { @@ -314,7 +311,7 @@ object Utils extends Logging {
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
* @throws KafkaException If the value is not in the given range
* @throws IllegalArgumentException If the value is not in the given range
* @return the long value
*/
def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
@ -323,10 +320,8 @@ object Utils extends Logging { @@ -323,10 +320,8 @@ object Utils extends Logging {
props.getProperty(name).toLong
else
default
if(v < range._1 || v > range._2)
throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
/**
@ -339,12 +334,11 @@ object Utils extends Logging { @@ -339,12 +334,11 @@ object Utils extends Logging {
def getBoolean(props: Properties, name: String, default: Boolean): Boolean = {
if(!props.containsKey(name))
default
else if("true" == props.getProperty(name))
true
else if("false" == props.getProperty(name))
false
else
throw new KafkaException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
else {
val v = props.getProperty(name)
require(v == "true" || v == "false", "Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false")
v.toBoolean
}
}
/**
@ -361,30 +355,24 @@ object Utils extends Logging { @@ -361,30 +355,24 @@ object Utils extends Logging {
* Get a string property or throw and exception if no such property is defined.
*/
def getString(props: Properties, name: String): String = {
if(props.containsKey(name))
props.getProperty(name)
else
throw new KafkaException("Missing required property '" + name + "'")
require(props.containsKey(name), "Missing required property '" + name + "'")
props.getProperty(name)
}
/**
* Get a property of type java.util.Properties or throw and exception if no such property is defined.
*/
def getProps(props: Properties, name: String): Properties = {
if(props.containsKey(name)) {
val propString = props.getProperty(name)
val propValues = propString.split(",")
val properties = new Properties
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
if(prop.length != 2)
throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
properties
require(props.containsKey(name), "Missing required property '" + name + "'")
val propString = props.getProperty(name)
val propValues = propString.split(",")
val properties = new Properties
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
else
throw new KafkaException("Missing required property '" + name + "'")
properties
}
/**
@ -394,13 +382,11 @@ object Utils extends Logging { @@ -394,13 +382,11 @@ object Utils extends Logging {
if(props.containsKey(name)) {
val propString = props.getProperty(name)
val propValues = propString.split(",")
if(propValues.length < 1)
throw new KafkaException("Illegal format of specifying properties '" + propString + "'")
require(propValues.length >= 1, "Illegal format of specifying properties '" + propString + "'")
val properties = new Properties
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
if(prop.length != 2)
throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'")
require(prop.length == 2, "Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
properties

2
core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

@ -480,7 +480,7 @@ class AsyncProducerTest extends JUnit3Suite { @@ -480,7 +480,7 @@ class AsyncProducerTest extends JUnit3Suite {
fail("should complain about wrong config")
}
catch {
case e: KafkaException => //expected
case e: IllegalArgumentException => //expected
}
}

Loading…
Cancel
Save