|
|
|
@ -21,46 +21,116 @@ import java.io._
@@ -21,46 +21,116 @@ import java.io._
|
|
|
|
|
import kafka.message._ |
|
|
|
|
import kafka.log._ |
|
|
|
|
import kafka.utils._ |
|
|
|
|
import collection.mutable |
|
|
|
|
import joptsimple.OptionParser |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
object DumpLogSegments { |
|
|
|
|
|
|
|
|
|
def main(args: Array[String]) { |
|
|
|
|
val print = args.contains("--print") |
|
|
|
|
val files = args.filter(_ != "--print") |
|
|
|
|
val parser = new OptionParser |
|
|
|
|
val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs") |
|
|
|
|
.withOptionalArg |
|
|
|
|
.describedAs("print data log content") |
|
|
|
|
.ofType(classOf[java.lang.Boolean]) |
|
|
|
|
.defaultsTo(false) |
|
|
|
|
|
|
|
|
|
val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content") |
|
|
|
|
.withOptionalArg |
|
|
|
|
.describedAs("just verify the index log") |
|
|
|
|
.ofType(classOf[java.lang.Boolean]) |
|
|
|
|
.defaultsTo(false) |
|
|
|
|
|
|
|
|
|
val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped") |
|
|
|
|
.withRequiredArg |
|
|
|
|
.describedAs("file1, file2, ...") |
|
|
|
|
.ofType(classOf[String]) |
|
|
|
|
|
|
|
|
|
val options = parser.parse(args : _*) |
|
|
|
|
if(!options.has(filesOpt)) { |
|
|
|
|
System.err.println("Missing required argument \"" + filesOpt + "\"") |
|
|
|
|
parser.printHelpOn(System.err) |
|
|
|
|
System.exit(1) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
val print = if(options.has(printOpt)) true else false |
|
|
|
|
val verifyOnly = if(options.has(verifyOpt)) true else false |
|
|
|
|
val files = options.valueOf(filesOpt).split(",") |
|
|
|
|
|
|
|
|
|
val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Int, Int)]] |
|
|
|
|
val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Int, Int)]] |
|
|
|
|
|
|
|
|
|
for(arg <- files) { |
|
|
|
|
val file = new File(arg) |
|
|
|
|
if(file.getName.endsWith(Log.LogFileSuffix)) { |
|
|
|
|
println("Dumping " + file) |
|
|
|
|
dumpLog(file, print) |
|
|
|
|
dumpLog(file, print, nonConsecutivePairsForLogFilesMap) |
|
|
|
|
} else if(file.getName.endsWith(Log.IndexFileSuffix)) { |
|
|
|
|
println("Dumping " + file) |
|
|
|
|
dumpIndex(file) |
|
|
|
|
dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
misMatchesForIndexFilesMap.foreach { |
|
|
|
|
case (fileName, listOfMismatches) => { |
|
|
|
|
System.err.println("Mismatches in :" + fileName) |
|
|
|
|
listOfMismatches.foreach(m => { |
|
|
|
|
System.err.println(" Index position: %d, log position: %d".format(m._1, m._2)) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
nonConsecutivePairsForLogFilesMap.foreach { |
|
|
|
|
case (fileName, listOfNonSecutivePairs) => { |
|
|
|
|
System.err.println("Non-secutive offsets in :" + fileName) |
|
|
|
|
listOfNonSecutivePairs.foreach(m => { |
|
|
|
|
System.err.println(" %d is followed by %d".format(m._1, m._2)) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* print out the contents of the index */ |
|
|
|
|
def dumpIndex(file: File) { |
|
|
|
|
private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Int, Int)]]) { |
|
|
|
|
val startOffset = file.getName().split("\\.")(0).toLong |
|
|
|
|
val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix |
|
|
|
|
val logFile = new File(logFileName) |
|
|
|
|
val messageSet = new FileMessageSet(logFile) |
|
|
|
|
val index = new OffsetIndex(file = file, baseOffset = startOffset) |
|
|
|
|
for(i <- 0 until index.entries) { |
|
|
|
|
val entry = index.entry(i) |
|
|
|
|
val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes()) |
|
|
|
|
val messageAndOffset = partialFileMessageSet.head |
|
|
|
|
if(messageAndOffset.offset != entry.offset + index.baseOffset) { |
|
|
|
|
var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, List[(Int, Int)]()) |
|
|
|
|
misMatchesSeq ::=((entry.offset + index.baseOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)]) |
|
|
|
|
misMatchesForIndexFilesMap.put(file.getName, misMatchesSeq) |
|
|
|
|
} |
|
|
|
|
// since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one |
|
|
|
|
if(entry.offset == 0 && i > 0) |
|
|
|
|
return |
|
|
|
|
println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position)) |
|
|
|
|
if (!verifyOnly) |
|
|
|
|
println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* print out the contents of the log */ |
|
|
|
|
def dumpLog(file: File, printContents: Boolean) { |
|
|
|
|
private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Int, Int)]]) { |
|
|
|
|
val startOffset = file.getName().split("\\.")(0).toLong |
|
|
|
|
println("Starting offset: " + startOffset) |
|
|
|
|
val messageSet = new FileMessageSet(file) |
|
|
|
|
var validBytes = 0L |
|
|
|
|
var lastOffset = -1l |
|
|
|
|
for(messageAndOffset <- messageSet) { |
|
|
|
|
val msg = messageAndOffset.message |
|
|
|
|
|
|
|
|
|
if(lastOffset == -1) |
|
|
|
|
lastOffset = messageAndOffset.offset |
|
|
|
|
else if (messageAndOffset.offset != lastOffset +1) { |
|
|
|
|
var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Int, Int)]()) |
|
|
|
|
nonConsecutivePairsSeq ::=((lastOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)]) |
|
|
|
|
nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq) |
|
|
|
|
} |
|
|
|
|
lastOffset = messageAndOffset.offset |
|
|
|
|
|
|
|
|
|
print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + |
|
|
|
|
" payloadsize: " + msg.payloadSize + " magic: " + msg.magic + |
|
|
|
|
" compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) |
|
|
|
@ -78,5 +148,4 @@ object DumpLogSegments {
@@ -78,5 +148,4 @@ object DumpLogSegments {
|
|
|
|
|
if(trailingBytes > 0) |
|
|
|
|
println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|