Browse Source

KAFKA-8745: DumpLogSegments doesn't show keys, when the message is null (#7152)

Make sure to show the message key, even when the message value is null.

This changes the output of one of the tools. Is the output of the tool considered a public API? Does this need a discussion or a KIP?

Testing: Ran the tool on a compacted topic. Previously, the tool did not show any message keys for tombstone messages (messages where the value is null). Now, the tool shows message keys.

Reviewers: Mickael Maison <mimaison@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
pull/7260/head
James Cheng 5 years ago committed by Guozhang Wang
parent
commit
e23a7182d5
  1. 12
      core/src/main/scala/kafka/tools/DumpLogSegments.scala
  2. 61
      core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

12
core/src/main/scala/kafka/tools/DumpLogSegments.scala

@ -218,14 +218,14 @@ object DumpLogSegments { @@ -218,14 +218,14 @@ object DumpLogSegments {
private class DecoderMessageParser[K, V](keyDecoder: Decoder[K], valueDecoder: Decoder[V]) extends MessageParser[K, V] {
override def parse(record: Record): (Option[K], Option[V]) = {
val key = if (record.hasKey)
Some(keyDecoder.fromBytes(Utils.readBytes(record.key)))
else
None
if (!record.hasValue) {
(None, None)
(key, None)
} else {
val key = if (record.hasKey)
Some(keyDecoder.fromBytes(Utils.readBytes(record.key)))
else
None
val payload = Some(valueDecoder.fromBytes(Utils.readBytes(record.value)))
(key, payload)

61
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

@ -33,6 +33,8 @@ import org.scalatest.Assertions.fail @@ -33,6 +33,8 @@ import org.scalatest.Assertions.fail
import scala.collection.mutable.ArrayBuffer
case class BatchInfo(records: Seq[SimpleRecord], hasKeys: Boolean, hasValues: Boolean)
class DumpLogSegmentsTest {
val tmpDir = TestUtils.tempDir()
@ -43,7 +45,7 @@ class DumpLogSegmentsTest { @@ -43,7 +45,7 @@ class DumpLogSegmentsTest {
val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
val time = new MockTime(0, 0)
val batches = new ArrayBuffer[Seq[SimpleRecord]]
val batches = new ArrayBuffer[BatchInfo]
var log: Log = _
@Before
@ -56,19 +58,19 @@ class DumpLogSegmentsTest { @@ -56,19 +58,19 @@ class DumpLogSegmentsTest {
logDirFailureChannel = new LogDirFailureChannel(10))
val now = System.currentTimeMillis()
val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"hello there $i".getBytes)}
batches += firstBatchRecords
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0, firstBatchRecords: _*),
leaderEpoch = 0)
val secondBatchRecords = (10 until 30).map { i => new SimpleRecord(now + i * 3, s"hello there again $i".getBytes)}
batches += secondBatchRecords
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0, secondBatchRecords: _*),
leaderEpoch = 0)
val thirdBatchRecords = (30 until 50).map { i => new SimpleRecord(now + i * 5, s"hello there one more time $i".getBytes)}
batches += thirdBatchRecords
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0, thirdBatchRecords: _*),
leaderEpoch = 0)
val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"message key $i".getBytes, s"message value $i".getBytes)}
batches += BatchInfo(firstBatchRecords, true, true)
val secondBatchRecords = (10 until 30).map { i => new SimpleRecord(now + i * 3, s"message key $i".getBytes, null)}
batches += BatchInfo(secondBatchRecords, true, false)
val thirdBatchRecords = (30 until 50).map { i => new SimpleRecord(now + i * 5, null, s"message value $i".getBytes)}
batches += BatchInfo(thirdBatchRecords, false, true)
val fourthBatchRecords = (50 until 60).map { i => new SimpleRecord(now + i * 7, null)}
batches += BatchInfo(fourthBatchRecords, false, false)
batches.foreach { batchInfo =>
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0, batchInfo.records: _*),
leaderEpoch = 0)
}
// Flush, but don't close so that the indexes are not trimmed and contain some zero entries
log.flush()
}
@ -82,7 +84,7 @@ class DumpLogSegmentsTest { @@ -82,7 +84,7 @@ class DumpLogSegmentsTest {
@Test
def testPrintDataLog(): Unit = {
def verifyRecordsInOutput(args: Array[String]): Unit = {
def verifyRecordsInOutput(checkKeysAndValues: Boolean, args: Array[String]): Unit = {
def isBatch(index: Int): Boolean = {
var i = 0
batches.zipWithIndex.foreach { case (batch, batchIndex) =>
@ -91,7 +93,7 @@ class DumpLogSegmentsTest { @@ -91,7 +93,7 @@ class DumpLogSegmentsTest {
i += 1
batch.indices.foreach { recordIndex =>
batch.records.indices.foreach { recordIndex =>
if (i == index)
return false
i += 1
@ -103,16 +105,27 @@ class DumpLogSegmentsTest { @@ -103,16 +105,27 @@ class DumpLogSegmentsTest {
val output = runDumpLogSegments(args)
val lines = output.split("\n")
assertTrue(s"Data not printed: $output", lines.length > 2)
val totalRecords = batches.map(_.size).sum
val totalRecords = batches.map(_.records.size).sum
var offset = 0
val batchIterator = batches.iterator
var batch : BatchInfo = null;
(0 until totalRecords + batches.size).foreach { index =>
val line = lines(lines.length - totalRecords - batches.size + index)
// The base offset of the batch is the offset of the first record in the batch, so we
// only increment the offset if it's not a batch
if (isBatch(index))
if (isBatch(index)) {
assertTrue(s"Not a valid batch-level message record: $line", line.startsWith(s"baseOffset: $offset lastOffset: "))
else {
batch = batchIterator.next
} else {
assertTrue(s"Not a valid message record: $line", line.startsWith(s"${DumpLogSegments.RecordIndent} offset: $offset"))
if (checkKeysAndValues) {
var suffix = "headerKeys: []"
if (batch.hasKeys)
suffix += s" key: message key $offset"
if (batch.hasValues)
suffix += s" payload: message value $offset"
assertTrue(s"Message record missing key or value: $line", line.endsWith(suffix))
}
offset += 1
}
}
@ -124,15 +137,15 @@ class DumpLogSegmentsTest { @@ -124,15 +137,15 @@ class DumpLogSegmentsTest {
}
// Verify that records are printed with --print-data-log even if --deep-iteration is not specified
verifyRecordsInOutput(Array("--print-data-log", "--files", logFilePath))
verifyRecordsInOutput(true, Array("--print-data-log", "--files", logFilePath))
// Verify that records are printed with --print-data-log if --deep-iteration is also specified
verifyRecordsInOutput(Array("--print-data-log", "--deep-iteration", "--files", logFilePath))
verifyRecordsInOutput(true, Array("--print-data-log", "--deep-iteration", "--files", logFilePath))
// Verify that records are printed with --value-decoder even if --print-data-log is not specified
verifyRecordsInOutput(Array("--value-decoder-class", "kafka.serializer.StringDecoder", "--files", logFilePath))
verifyRecordsInOutput(true, Array("--value-decoder-class", "kafka.serializer.StringDecoder", "--files", logFilePath))
// Verify that records are printed with --key-decoder even if --print-data-log is not specified
verifyRecordsInOutput(Array("--key-decoder-class", "kafka.serializer.StringDecoder", "--files", logFilePath))
verifyRecordsInOutput(true, Array("--key-decoder-class", "kafka.serializer.StringDecoder", "--files", logFilePath))
// Verify that records are printed with --deep-iteration even if --print-data-log is not specified
verifyRecordsInOutput(Array("--deep-iteration", "--files", logFilePath))
verifyRecordsInOutput(false, Array("--deep-iteration", "--files", logFilePath))
// Verify that records are not printed by default
verifyNoRecordsInOutput(Array("--files", logFilePath))

Loading…
Cancel
Save