Browse Source

KAFKA-5232; Fix Log.parseTopicPartitionName to handle deleted topics with a period in the name

This issue would only be triggered if a broker was restarted while
deletion was still taking place.

Included a few minor improvements to that method and its tests.

Author: Jaikiran Pai <jaikiran.pai@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3050 from jaikiran/KAFKA-5232-trunk
pull/2963/head
Jaikiran Pai 8 years ago committed by Ismael Juma
parent
commit
f56bbb6510
  1. 42
      core/src/main/scala/kafka/log/Log.scala
  2. 11
      core/src/main/scala/kafka/log/LogManager.scala
  3. 2
      core/src/main/scala/kafka/log/LogSegment.scala
  4. 82
      core/src/test/scala/unit/kafka/log/LogTest.scala

42
core/src/main/scala/kafka/log/Log.scala

@ -44,6 +44,7 @@ import org.apache.kafka.common.TopicPartition @@ -44,6 +44,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import java.util.Map.{Entry => JEntry}
import java.lang.{Long => JLong}
import java.util.regex.Pattern
import org.apache.kafka.common.internals.Topic
@ -256,7 +257,7 @@ class Log(@volatile var dir: File, @@ -256,7 +257,7 @@ class Log(@volatile var dir: File,
if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
val offset = offsetFromFilename(filename)
val logFile = logFilename(dir, offset)
val logFile = Log.logFile(dir, offset)
if (!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
Files.deleteIfExists(file.toPath)
@ -1139,7 +1140,7 @@ class Log(@volatile var dir: File, @@ -1139,7 +1140,7 @@ class Log(@volatile var dir: File,
val start = time.nanoseconds
lock synchronized {
val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = logFilename(dir, newOffset)
val logFile = Log.logFile(dir, newOffset)
val offsetIdxFile = offsetIndexFile(dir, newOffset)
val timeIdxFile = timeIndexFile(dir, newOffset)
val txnIdxFile = transactionIndexFile(dir, newOffset)
@ -1494,6 +1495,8 @@ object Log { @@ -1494,6 +1495,8 @@ object Log {
/** a directory that is scheduled to be deleted */
val DeleteDirSuffix = "-delete"
private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
val UnknownLogStartOffset = -1L
/**
@ -1517,9 +1520,18 @@ object Log { @@ -1517,9 +1520,18 @@ object Log {
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
def logFilename(dir: File, offset: Long) =
def logFile(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
/**
* Return a directory name to rename the log directory to for async deletion. The name will be in the following
* format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables.
*/
def logDeleteDirName(logName: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
s"$logName.$uniqueId$DeleteDirSuffix"
}
/**
* Construct an index file name in the given dir using the given base offset
*
@ -1557,32 +1569,36 @@ object Log { @@ -1557,32 +1569,36 @@ object Log {
* Parse the topic and partition out of the directory name of a log
*/
def parseTopicPartitionName(dir: File): TopicPartition = {
if (dir == null)
throw new KafkaException("dir should not be null")
def exception(dir: File): KafkaException = {
new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
"'" + dir.getName + "' is not in the form of topic-partition or " +
"ongoing-deleting directory(topic-partition.uniqueId-delete)\n" +
"If a directory does not contain Kafka topic data it should not exist in Kafka's log " +
"directory")
new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " +
"topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
"Kafka's log directories (and children) should only contain Kafka topic data.")
}
val dirName = dir.getName
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
throw exception(dir)
if (dirName.endsWith(DeleteDirSuffix) && !dirName.matches("^(\\S+)-(\\S+)\\.(\\S+)" + DeleteDirSuffix))
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches)
throw exception(dir)
val name: String =
if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.'))
if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
else dirName
val index = name.lastIndexOf('-')
val topic = name.substring(0, index)
val partition = name.substring(index + 1)
if (topic.length < 1 || partition.length < 1)
val partitionString = name.substring(index + 1)
if (topic.isEmpty || partitionString.isEmpty)
throw exception(dir)
new TopicPartition(topic, partition.toInt)
val partition =
try partitionString.toInt
catch { case _: NumberFormatException => throw exception(dir) }
new TopicPartition(topic, partition)
}
private def isIndexFile(file: File): Boolean = {

11
core/src/main/scala/kafka/log/LogManager.scala

@ -461,20 +461,15 @@ class LogManager(val logDirs: Array[File], @@ -461,20 +461,15 @@ class LogManager(val logDirs: Array[File],
*/
def asyncDelete(topicPartition: TopicPartition) = {
val removedLog: Log = logCreationOrDeletionLock synchronized {
logs.remove(topicPartition)
}
logs.remove(topicPartition)
}
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null) {
cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
// renaming the directory to topic-partition.uniqueId-delete
val dirName = new StringBuilder(removedLog.name)
.append(".")
.append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
.append(Log.DeleteDirSuffix)
.toString()
val dirName = Log.logDeleteDirName(removedLog.name)
removedLog.close()
val renamedDir = new File(removedLog.dir.getParent, dirName)
val renameSuccessful = removedLog.dir.renameTo(renamedDir)

2
core/src/main/scala/kafka/log/LogSegment.scala

@ -73,7 +73,7 @@ class LogSegment(val log: FileRecords, @@ -73,7 +73,7 @@ class LogSegment(val log: FileRecords,
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time,
fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
this(FileRecords.open(Log.logFile(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)),

82
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -59,7 +59,7 @@ class LogTest { @@ -59,7 +59,7 @@ class LogTest {
def createEmptyLogs(dir: File, offsets: Int*) {
for(offset <- offsets) {
Log.logFilename(dir, offset).createNewFile()
Log.logFile(dir, offset).createNewFile()
Log.offsetIndexFile(dir, offset).createNewFile()
}
}
@ -68,7 +68,7 @@ class LogTest { @@ -68,7 +68,7 @@ class LogTest {
def testOffsetFromFilename() {
val offset = 23423423L
val logFile = Log.logFilename(tmpDir, offset)
val logFile = Log.logFile(tmpDir, offset)
assertEquals(offset, Log.offsetFromFilename(logFile.getName))
val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset)
@ -1634,12 +1634,26 @@ class LogTest { @@ -1634,12 +1634,26 @@ class LogTest {
def testParseTopicPartitionName() {
val topic = "test_topic"
val partition = "143"
val dir = new File(logDir + topicPartitionName(topic, partition))
val dir = new File(logDir, topicPartitionName(topic, partition))
val topicPartition = Log.parseTopicPartitionName(dir)
assertEquals(topic, topicPartition.topic)
assertEquals(partition.toInt, topicPartition.partition)
}
/**
* Tests that log directories with a period in their name that have been marked for deletion
* are parsed correctly by `Log.parseTopicPartitionName` (see KAFKA-5232 for details).
*/
@Test
def testParseTopicPartitionNameWithPeriodForDeletedTopic() {
val topic = "foo.bar-testtopic"
val partition = "42"
val dir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition)))
val topicPartition = Log.parseTopicPartitionName(dir)
assertEquals("Unexpected topic name parsed", topic, topicPartition.topic)
assertEquals("Unexpected partition number parsed", partition.toInt, topicPartition.partition)
}
@Test
def testParseTopicPartitionNameForEmptyName() {
try {
@ -1647,7 +1661,7 @@ class LogTest { @@ -1647,7 +1661,7 @@ class LogTest {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case _: Exception => // its GOOD!
case _: KafkaException => // its GOOD!
}
}
@ -1658,7 +1672,7 @@ class LogTest { @@ -1658,7 +1672,7 @@ class LogTest {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir)
} catch {
case _: Exception => // its GOOD!
case _: KafkaException => // its GOOD!
}
}
@ -1666,12 +1680,20 @@ class LogTest { @@ -1666,12 +1680,20 @@ class LogTest {
def testParseTopicPartitionNameForMissingSeparator() {
val topic = "test_topic"
val partition = "1999"
val dir = new File(logDir + File.separator + topic + partition)
val dir = new File(logDir, topic + partition)
try {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case _: Exception => // its GOOD!
case _: KafkaException => // expected
}
// also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topic + partition))
try {
Log.parseTopicPartitionName(deleteMarkerDir)
fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} catch {
case _: KafkaException => // expected
}
}
@ -1679,13 +1701,22 @@ class LogTest { @@ -1679,13 +1701,22 @@ class LogTest {
def testParseTopicPartitionNameForMissingTopic() {
val topic = ""
val partition = "1999"
val dir = new File(logDir + topicPartitionName(topic, partition))
val dir = new File(logDir, topicPartitionName(topic, partition))
try {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case _: Exception => // its GOOD!
case _: KafkaException => // expected
}
// also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition)))
try {
Log.parseTopicPartitionName(deleteMarkerDir)
fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} catch {
case _: KafkaException => // expected
}
}
@Test
@ -1697,7 +1728,36 @@ class LogTest { @@ -1697,7 +1728,36 @@ class LogTest {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case _: Exception => // its GOOD!
case _: KafkaException => // expected
}
// also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition)))
try {
Log.parseTopicPartitionName(deleteMarkerDir)
fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} catch {
case _: KafkaException => // expected
}
}
@Test
def testParseTopicPartitionNameForInvalidPartition() {
val topic = "test_topic"
val partition = "1999a"
val dir = new File(logDir, topicPartitionName(topic, partition))
try {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case _: KafkaException => // expected
}
// also test the "-delete" marker case
val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topic + partition))
try {
Log.parseTopicPartitionName(deleteMarkerDir)
fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
} catch {
case _: KafkaException => // expected
}
}
@ -1720,7 +1780,7 @@ class LogTest { @@ -1720,7 +1780,7 @@ class LogTest {
}
def topicPartitionName(topic: String, partition: String): String =
File.separator + topic + "-" + partition
topic + "-" + partition
@Test
def testDeleteOldSegmentsMethod() {

Loading…
Cancel
Save