diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 4fae2f0d339..024506cd005 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -84,7 +84,7 @@ class Log(val dir: File, /* Calculate the offset of the next message */ @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt) - val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) + val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir) info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) @@ -832,9 +832,25 @@ object Log { /** * Parse the topic and partition out of the directory name of a log */ - def parseTopicPartitionName(name: String): TopicAndPartition = { + def parseTopicPartitionName(dir: File): TopicAndPartition = { + val name: String = dir.getName + if (name == null || name.isEmpty || !name.contains('-')) { + throwException(dir) + } val index = name.lastIndexOf('-') - TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + val topic: String = name.substring(0, index) + val partition: String = name.substring(index + 1) + if (topic.length < 1 || partition.length < 1) { + throwException(dir) + } + TopicAndPartition(topic, partition.toInt) + } + + def throwException(dir: File) { + throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " + + "'" + dir.getName + "' is not in the form of topic-partition\n" + + "If a directory does not contain Kafka topic data it should not exist in Kafka's log " + + "directory") } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4d2924d04bc..4ebaae00ca4 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -134,7 +134,7 @@ class LogManager(val logDirs: Array[File], Utils.runnable { debug("Loading log '" + logDir.getName + "'") - val topicPartition = Log.parseTopicPartitionName(logDir.getName) + val topicPartition = Log.parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index d670ba76acd..c2dd8eb69da 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -688,4 +688,79 @@ class LogTest extends JUnitSuite { assertEquals(recoveryPoint, log.logEndOffset) cleanShutdownFile.delete() } + + @Test + def testParseTopicPartitionName() { + val topic: String = "test_topic" + val partition:String = "143" + val dir: File = new File(logDir + topicPartitionName(topic, partition)) + val topicAndPartition = Log.parseTopicPartitionName(dir); + assertEquals(topic, topicAndPartition.asTuple._1) + assertEquals(partition.toInt, topicAndPartition.asTuple._2) + } + + @Test + def testParseTopicPartitionNameForEmptyName() { + try { + val dir: File = new File("") + val topicAndPartition = Log.parseTopicPartitionName(dir); + fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForNull() { + try { + val dir: File = null + val topicAndPartition = Log.parseTopicPartitionName(dir); + fail("KafkaException should have been thrown for dir: " + dir) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForMissingSeparator() { + val topic: String = "test_topic" + val partition:String = "1999" + val dir: File = new File(logDir + File.separator + topic + partition) + try { + val topicAndPartition = Log.parseTopicPartitionName(dir); + fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForMissingTopic() { + val topic: String = "" + val partition:String = "1999" + val dir: File = new File(logDir + topicPartitionName(topic, partition)) + try { + val topicAndPartition = Log.parseTopicPartitionName(dir); + fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForMissingPartition() { + val topic: String = "test_topic" + val partition:String = "" + val dir: File = new File(logDir + topicPartitionName(topic, partition)) + try { + val topicAndPartition = Log.parseTopicPartitionName(dir); + fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) + } catch { + case e: Exception => // its GOOD! + } + } + + def topicPartitionName(topic: String, partition: String): String = { + File.separator + topic + "-" + partition + } }