Browse Source

KAFKA-3442; Fix FileMessageSet iterator.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #1112 from becketqin/KAFKA-3442
pull/1112/merge
Jiangjie Qin 9 years ago committed by Jun Rao
parent
commit
7af67ce22a
  1. 28
      core/src/main/scala/kafka/log/FileMessageSet.scala
  2. 17
      core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala

28
core/src/main/scala/kafka/log/FileMessageSet.scala

@ -206,7 +206,7 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -206,7 +206,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
/**
* Convert this message set to use the specified message format.
*/
def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = {
def toMessageFormat(toMagicValue: Byte): MessageSet = {
val offsets = new ArrayBuffer[Long]
val newMessages = new ArrayBuffer[Message]
this.foreach { messageAndOffset =>
@ -224,11 +224,16 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -224,11 +224,16 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
}
// We use the offset seq to assign offsets so the offset of the messages does not change.
new ByteBufferMessageSet(
compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
offsetSeq = offsets,
newMessages: _*)
if (sizeInBytes > 0 && newMessages.size == 0) {
// This indicates that the message is too large. We just return all the bytes in the file message set.
this
} else {
// We use the offset seq to assign offsets so the offset of the messages does not change.
new ByteBufferMessageSet(
compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
offsetSeq = offsets,
newMessages: _*)
}
}
/**
@ -245,10 +250,11 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -245,10 +250,11 @@ class FileMessageSet private[kafka](@volatile var file: File,
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = start
val sizeOffsetBuffer = ByteBuffer.allocate(12)
val sizeOffsetLength = 12
val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength)
override def makeNext(): MessageAndOffset = {
if(location >= end)
if(location + sizeOffsetLength >= end)
return allDone()
// read the size of the item
@ -260,20 +266,20 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -260,20 +266,20 @@ class FileMessageSet private[kafka](@volatile var file: File,
sizeOffsetBuffer.rewind()
val offset = sizeOffsetBuffer.getLong()
val size = sizeOffsetBuffer.getInt()
if(size < Message.MinMessageOverhead)
if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end)
return allDone()
if(size > maxMessageSize)
throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
// read the item itself
val buffer = ByteBuffer.allocate(size)
channel.read(buffer, location + 12)
channel.read(buffer, location + sizeOffsetLength)
if(buffer.hasRemaining)
return allDone()
buffer.rewind()
// increment the location and return the item
location += size + 12
location += size + sizeOffsetLength
new MessageAndOffset(new Message(buffer), offset)
}
}

17
core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala

@ -133,11 +133,13 @@ class FileMessageSetTest extends BaseMessageSetTestCases { @@ -133,11 +133,13 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
def testIteratorWithLimits() {
val message = messageSet.toList(1)
val start = messageSet.searchFor(1, 0).position
val size = message.message.size
val size = message.message.size + 12
val slice = messageSet.read(start, size)
assertEquals(List(message), slice.toList)
val slice2 = messageSet.read(start, size - 1)
assertEquals(List(), slice2.toList)
}
/**
* Test the truncateTo method lops off messages and appropriately updates the size
*/
@ -202,6 +204,17 @@ class FileMessageSetTest extends BaseMessageSetTestCases { @@ -202,6 +204,17 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
assertEquals(oldposition, tempReopen.length)
}
@Test
def testFormatConversionWithPartialMessage() {
val message = messageSet.toList(1)
val start = messageSet.searchFor(1, 0).position
val size = message.message.size + 12
val slice = messageSet.read(start, size - 1)
val messageV0 = slice.toMessageFormat(Message.MagicValue_V0)
assertEquals("No message should be there", 0, messageV0.size)
assertEquals(s"There should be ${size - 1} bytes", size - 1, messageV0.sizeInBytes)
}
@Test
def testMessageFormatConversion() {

Loading…
Cancel
Save