|
|
|
@ -597,44 +597,72 @@ object Utils extends Logging {
@@ -597,44 +597,72 @@ object Utils extends Logging {
|
|
|
|
|
|
|
|
|
|
def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = { |
|
|
|
|
val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: " |
|
|
|
|
val successMsg = "The retention hour for " |
|
|
|
|
getCSVMap(retentionHours, exceptionMsg, successMsg) |
|
|
|
|
val successMsg = "The retention hours for " |
|
|
|
|
val map: Map[String, Int] = getCSVMap(retentionHours, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, hrs) => |
|
|
|
|
require(hrs > 0, "Log retention hours value for topic " + topic + " is " + hrs + |
|
|
|
|
" which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getTopicRollHours(rollHours: String) : Map[String, Int] = { |
|
|
|
|
val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: " |
|
|
|
|
val successMsg = "The roll hour for " |
|
|
|
|
getCSVMap(rollHours, exceptionMsg, successMsg) |
|
|
|
|
val successMsg = "The roll hours for " |
|
|
|
|
val map: Map[String, Int] = getCSVMap(rollHours, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, hrs) => |
|
|
|
|
require(hrs > 0, "Log roll hours value for topic " + topic + " is " + hrs + |
|
|
|
|
" which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getTopicFileSize(fileSizes: String): Map[String, Int] = { |
|
|
|
|
val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: " |
|
|
|
|
val successMsg = "The roll hour for " |
|
|
|
|
getCSVMap(fileSizes, exceptionMsg, successMsg) |
|
|
|
|
val successMsg = "The log file size for " |
|
|
|
|
val map: Map[String, Int] = getCSVMap(fileSizes, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, size) => |
|
|
|
|
require(size > 0, "Log file size value for topic " + topic + " is " + size + |
|
|
|
|
" which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = { |
|
|
|
|
val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: " |
|
|
|
|
val successMsg = "The roll hour for " |
|
|
|
|
getCSVMap(retentionSizes, exceptionMsg, successMsg) |
|
|
|
|
val successMsg = "The log retention size for " |
|
|
|
|
val map: Map[String, Long] = getCSVMap(retentionSizes, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, size) => |
|
|
|
|
require(size > 0, "Log retention size value for topic " + topic + " is " + size + |
|
|
|
|
" which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = { |
|
|
|
|
val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: " |
|
|
|
|
val successMsg = "The flush interval for " |
|
|
|
|
getCSVMap(allIntervals, exceptionMsg, successMsg) |
|
|
|
|
val map: Map[String, Int] = getCSVMap(allIntervals, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, interval) => |
|
|
|
|
require(interval > 0, "Flush interval value for topic " + topic + " is " + interval + |
|
|
|
|
" ms which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getTopicPartitions(allPartitions: String) : Map[String, Int] = { |
|
|
|
|
val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: " |
|
|
|
|
val successMsg = "The number of partitions for topic " |
|
|
|
|
getCSVMap(allPartitions, exceptionMsg, successMsg) |
|
|
|
|
val map: Map[String, Int] = getCSVMap(allPartitions, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, count) => |
|
|
|
|
require(count > 0, "The number of partitions for topic " + topic + " is " + count + |
|
|
|
|
" which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = { |
|
|
|
|
val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: " |
|
|
|
|
val successMsg = "The number of consumer thread for topic " |
|
|
|
|
getCSVMap(consumerTopicString, exceptionMsg, successMsg) |
|
|
|
|
val successMsg = "The number of consumer threads for topic " |
|
|
|
|
val map: Map[String, Int] = getCSVMap(consumerTopicString, exceptionMsg, successMsg) |
|
|
|
|
map.foreach{case(topic, count) => |
|
|
|
|
require(count > 0, "The number of consumer threads for topic " + topic + " is " + count + |
|
|
|
|
" which is not greater than 0.")} |
|
|
|
|
map |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def getObject[T<:AnyRef](className: String): T = { |
|
|
|
|