diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 63076219dfe..a14393c73bf 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -597,44 +597,72 @@ object Utils extends Logging { def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = { val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: " - val successMsg = "The retention hour for " - getCSVMap(retentionHours, exceptionMsg, successMsg) + val successMsg = "The retention hours for " + val map: Map[String, Int] = getCSVMap(retentionHours, exceptionMsg, successMsg) + map.foreach{case(topic, hrs) => + require(hrs > 0, "Log retention hours value for topic " + topic + " is " + hrs + + " which is not greater than 0.")} + map } def getTopicRollHours(rollHours: String) : Map[String, Int] = { val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: " - val successMsg = "The roll hour for " - getCSVMap(rollHours, exceptionMsg, successMsg) + val successMsg = "The roll hours for " + val map: Map[String, Int] = getCSVMap(rollHours, exceptionMsg, successMsg) + map.foreach{case(topic, hrs) => + require(hrs > 0, "Log roll hours value for topic " + topic + " is " + hrs + + " which is not greater than 0.")} + map } def getTopicFileSize(fileSizes: String): Map[String, Int] = { val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: " - val successMsg = "The roll hour for " - getCSVMap(fileSizes, exceptionMsg, successMsg) + val successMsg = "The log file size for " + val map: Map[String, Int] = getCSVMap(fileSizes, exceptionMsg, successMsg) + map.foreach{case(topic, size) => + require(size > 0, "Log file size value for topic " + topic + " is " + size + + " which is not greater than 0.")} + map } def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = { val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: " - val successMsg = "The roll hour for " - getCSVMap(retentionSizes, exceptionMsg, successMsg) + val successMsg = "The log retention size for " + val map: Map[String, Long] = getCSVMap(retentionSizes, exceptionMsg, successMsg) + map.foreach{case(topic, size) => + require(size > 0, "Log retention size value for topic " + topic + " is " + size + + " which is not greater than 0.")} + map } def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = { val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: " val successMsg = "The flush interval for " - getCSVMap(allIntervals, exceptionMsg, successMsg) + val map: Map[String, Int] = getCSVMap(allIntervals, exceptionMsg, successMsg) + map.foreach{case(topic, interval) => + require(interval > 0, "Flush interval value for topic " + topic + " is " + interval + + " ms which is not greater than 0.")} + map } def getTopicPartitions(allPartitions: String) : Map[String, Int] = { val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: " val successMsg = "The number of partitions for topic " - getCSVMap(allPartitions, exceptionMsg, successMsg) + val map: Map[String, Int] = getCSVMap(allPartitions, exceptionMsg, successMsg) + map.foreach{case(topic, count) => + require(count > 0, "The number of partitions for topic " + topic + " is " + count + + " which is not greater than 0.")} + map } def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = { val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: " - val successMsg = "The number of consumer thread for topic " - getCSVMap(consumerTopicString, exceptionMsg, successMsg) + val successMsg = "The number of consumer threads for topic " + val map: Map[String, Int] = getCSVMap(consumerTopicString, exceptionMsg, successMsg) + map.foreach{case(topic, count) => + require(count > 0, "The number of consumer threads for topic " + topic + " is " + count + + " which is not greater than 0.")} + map } def getObject[T<:AnyRef](className: String): T = {