From f54f481873b9fe8f5675ef5a1faf823aa590697a Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 5 Nov 2012 23:37:24 +0000 Subject: [PATCH] extend DumpLogSegments to verify consistency btw data and index; patched by Yang Ye; reviewed by Jun Rao; KAFKA-577 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1406009 13f79535-47bb-0310-9956-ffa450edef68 --- .../scala/kafka/tools/DumpLogSegments.scala | 85 +++++++++++++++++-- 1 file changed, 77 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 44be65c6a8f..64759a07288 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -21,46 +21,116 @@ import java.io._ import kafka.message._ import kafka.log._ import kafka.utils._ +import collection.mutable +import joptsimple.OptionParser + object DumpLogSegments { def main(args: Array[String]) { - val print = args.contains("--print") - val files = args.filter(_ != "--print") + val parser = new OptionParser + val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs") + .withOptionalArg + .describedAs("print data log content") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) + + val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content") + .withOptionalArg + .describedAs("just verify the index log") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) + + val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped") + .withRequiredArg + .describedAs("file1, file2, ...") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + if(!options.has(filesOpt)) { + System.err.println("Missing required argument \"" + filesOpt + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + + val print = if(options.has(printOpt)) true else false + val verifyOnly = if(options.has(verifyOpt)) true else false + val files = options.valueOf(filesOpt).split(",") + + val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Int, Int)]] + val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Int, Int)]] for(arg <- files) { val file = new File(arg) if(file.getName.endsWith(Log.LogFileSuffix)) { println("Dumping " + file) - dumpLog(file, print) + dumpLog(file, print, nonConsecutivePairsForLogFilesMap) } else if(file.getName.endsWith(Log.IndexFileSuffix)) { println("Dumping " + file) - dumpIndex(file) + dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap) + } + } + misMatchesForIndexFilesMap.foreach { + case (fileName, listOfMismatches) => { + System.err.println("Mismatches in :" + fileName) + listOfMismatches.foreach(m => { + System.err.println(" Index position: %d, log position: %d".format(m._1, m._2)) + }) + } + } + nonConsecutivePairsForLogFilesMap.foreach { + case (fileName, listOfNonSecutivePairs) => { + System.err.println("Non-secutive offsets in :" + fileName) + listOfNonSecutivePairs.foreach(m => { + System.err.println(" %d is followed by %d".format(m._1, m._2)) + }) } } } /* print out the contents of the index */ - def dumpIndex(file: File) { + private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Int, Int)]]) { val startOffset = file.getName().split("\\.")(0).toLong + val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix + val logFile = new File(logFileName) + val messageSet = new FileMessageSet(logFile) val index = new OffsetIndex(file = file, baseOffset = startOffset) for(i <- 0 until index.entries) { val entry = index.entry(i) + val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes()) + val messageAndOffset = partialFileMessageSet.head + if(messageAndOffset.offset != entry.offset + index.baseOffset) { + var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, List[(Int, Int)]()) + misMatchesSeq ::=((entry.offset + index.baseOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)]) + misMatchesForIndexFilesMap.put(file.getName, misMatchesSeq) + } // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one if(entry.offset == 0 && i > 0) return - println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position)) + if (!verifyOnly) + println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position)) } } /* print out the contents of the log */ - def dumpLog(file: File, printContents: Boolean) { + private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Int, Int)]]) { val startOffset = file.getName().split("\\.")(0).toLong println("Starting offset: " + startOffset) val messageSet = new FileMessageSet(file) var validBytes = 0L + var lastOffset = -1l for(messageAndOffset <- messageSet) { val msg = messageAndOffset.message + + if(lastOffset == -1) + lastOffset = messageAndOffset.offset + else if (messageAndOffset.offset != lastOffset +1) { + var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Int, Int)]()) + nonConsecutivePairsSeq ::=((lastOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)]) + nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq) + } + lastOffset = messageAndOffset.offset + print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) @@ -78,5 +148,4 @@ object DumpLogSegments { if(trailingBytes > 0) println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName)) } - }