Browse Source

KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start; reviewed by Neha Narkhede

pull/38/head
Ashish Singh 10 years ago committed by Neha Narkhede
parent
commit
ae0bb84fa7
  1. 22
      core/src/main/scala/kafka/log/Log.scala
  2. 2
      core/src/main/scala/kafka/log/LogManager.scala
  3. 75
      core/src/test/scala/unit/kafka/log/LogTest.scala

22
core/src/main/scala/kafka/log/Log.scala

@ -84,7 +84,7 @@ class Log(val dir: File, @@ -84,7 +84,7 @@ class Log(val dir: File,
/* Calculate the offset of the next message */
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
@ -832,9 +832,25 @@ object Log { @@ -832,9 +832,25 @@ object Log {
/**
* Parse the topic and partition out of the directory name of a log
*/
def parseTopicPartitionName(name: String): TopicAndPartition = {
def parseTopicPartitionName(dir: File): TopicAndPartition = {
val name: String = dir.getName
if (name == null || name.isEmpty || !name.contains('-')) {
throwException(dir)
}
val index = name.lastIndexOf('-')
TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
val topic: String = name.substring(0, index)
val partition: String = name.substring(index + 1)
if (topic.length < 1 || partition.length < 1) {
throwException(dir)
}
TopicAndPartition(topic, partition.toInt)
}
def throwException(dir: File) {
throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
"'" + dir.getName + "' is not in the form of topic-partition\n" +
"If a directory does not contain Kafka topic data it should not exist in Kafka's log " +
"directory")
}
}

2
core/src/main/scala/kafka/log/LogManager.scala

@ -134,7 +134,7 @@ class LogManager(val logDirs: Array[File], @@ -134,7 +134,7 @@ class LogManager(val logDirs: Array[File],
Utils.runnable {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

75
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -688,4 +688,79 @@ class LogTest extends JUnitSuite { @@ -688,4 +688,79 @@ class LogTest extends JUnitSuite {
assertEquals(recoveryPoint, log.logEndOffset)
cleanShutdownFile.delete()
}
@Test
def testParseTopicPartitionName() {
val topic: String = "test_topic"
val partition:String = "143"
val dir: File = new File(logDir + topicPartitionName(topic, partition))
val topicAndPartition = Log.parseTopicPartitionName(dir);
assertEquals(topic, topicAndPartition.asTuple._1)
assertEquals(partition.toInt, topicAndPartition.asTuple._2)
}
@Test
def testParseTopicPartitionNameForEmptyName() {
try {
val dir: File = new File("")
val topicAndPartition = Log.parseTopicPartitionName(dir);
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForNull() {
try {
val dir: File = null
val topicAndPartition = Log.parseTopicPartitionName(dir);
fail("KafkaException should have been thrown for dir: " + dir)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForMissingSeparator() {
val topic: String = "test_topic"
val partition:String = "1999"
val dir: File = new File(logDir + File.separator + topic + partition)
try {
val topicAndPartition = Log.parseTopicPartitionName(dir);
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForMissingTopic() {
val topic: String = ""
val partition:String = "1999"
val dir: File = new File(logDir + topicPartitionName(topic, partition))
try {
val topicAndPartition = Log.parseTopicPartitionName(dir);
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForMissingPartition() {
val topic: String = "test_topic"
val partition:String = ""
val dir: File = new File(logDir + topicPartitionName(topic, partition))
try {
val topicAndPartition = Log.parseTopicPartitionName(dir);
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
def topicPartitionName(topic: String, partition: String): String = {
File.separator + topic + "-" + partition
}
}

Loading…
Cancel
Save