Browse Source

kafka-2012; Broker should automatically handle corrupt index files; patched by Manikumar Reddy; reviewed by Jun Rao

pull/71/head
Manikumar Reddy 10 years ago committed by Jun Rao
parent
commit
16ecf9806b
  1. 18
      core/src/main/scala/kafka/log/Log.scala
  2. 33
      core/src/test/scala/unit/kafka/log/LogTest.scala

18
core/src/main/scala/kafka/log/Log.scala

@ -169,7 +169,7 @@ class Log(val dir: File, @@ -169,7 +169,7 @@ class Log(val dir: File,
} else if(filename.endsWith(LogFileSuffix)) {
// if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val hasIndex = Log.indexFilename(dir, start).exists
val indexFile = Log.indexFilename(dir, start)
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
@ -177,7 +177,18 @@ class Log(val dir: File, @@ -177,7 +177,18 @@ class Log(val dir: File,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = true)
if(!hasIndex) {
if(indexFile.exists()) {
try {
segment.index.sanityCheck()
} catch {
case e: java.lang.IllegalArgumentException =>
warn("Found an corrupted index file, %s, deleting and rebuilding index...".format(indexFile.getAbsolutePath))
indexFile.delete()
segment.recover(config.maxMessageSize)
}
}
else {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
@ -223,9 +234,6 @@ class Log(val dir: File, @@ -223,9 +234,6 @@ class Log(val dir: File,
activeSegment.index.resize(config.maxIndexSize)
}
// sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
for (s <- logSegments)
s.index.sanityCheck()
}
private def updateLogEndOffset(messageOffset: Long) {

33
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -488,6 +488,39 @@ class LogTest extends JUnitSuite { @@ -488,6 +488,39 @@ class LogTest extends JUnitSuite {
log.close()
}
/**
* Test that if we have corrupted an index segment it is rebuilt when the log is re-opened
*/
@Test
def testCorruptIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
log.close()
// corrupt all the index files
for( file <- indexFiles) {
val bw = new BufferedWriter(new FileWriter(file))
bw.write(" ")
bw.close()
}
// reopen the log
log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
log.close()
}
/**
* Test the Log truncate operations
*/

Loading…
Cancel
Save