diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 45b3df9970e..a164b4b9673 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -206,7 +206,7 @@ class FileMessageSet private[kafka](@volatile var file: File, /** * Convert this message set to use the specified message format. */ - def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = { + def toMessageFormat(toMagicValue: Byte): MessageSet = { val offsets = new ArrayBuffer[Long] val newMessages = new ArrayBuffer[Message] this.foreach { messageAndOffset => @@ -224,11 +224,16 @@ class FileMessageSet private[kafka](@volatile var file: File, } } - // We use the offset seq to assign offsets so the offset of the messages does not change. - new ByteBufferMessageSet( - compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec), - offsetSeq = offsets, - newMessages: _*) + if (sizeInBytes > 0 && newMessages.size == 0) { + // This indicates that the message is too large. We just return all the bytes in the file message set. + this + } else { + // We use the offset seq to assign offsets so the offset of the messages does not change. + new ByteBufferMessageSet( + compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec), + offsetSeq = offsets, + newMessages: _*) + } } /** @@ -245,10 +250,11 @@ class FileMessageSet private[kafka](@volatile var file: File, def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var location = start - val sizeOffsetBuffer = ByteBuffer.allocate(12) + val sizeOffsetLength = 12 + val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength) override def makeNext(): MessageAndOffset = { - if(location >= end) + if(location + sizeOffsetLength >= end) return allDone() // read the size of the item @@ -260,20 +266,20 @@ class FileMessageSet private[kafka](@volatile var file: File, sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() - if(size < Message.MinMessageOverhead) + if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end) return allDone() if(size > maxMessageSize) throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) - channel.read(buffer, location + 12) + channel.read(buffer, location + sizeOffsetLength) if(buffer.hasRemaining) return allDone() buffer.rewind() // increment the location and return the item - location += size + 12 + location += size + sizeOffsetLength new MessageAndOffset(new Message(buffer), offset) } } diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index a3e5b2d4f01..534443ce320 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -133,11 +133,13 @@ class FileMessageSetTest extends BaseMessageSetTestCases { def testIteratorWithLimits() { val message = messageSet.toList(1) val start = messageSet.searchFor(1, 0).position - val size = message.message.size + val size = message.message.size + 12 val slice = messageSet.read(start, size) assertEquals(List(message), slice.toList) + val slice2 = messageSet.read(start, size - 1) + assertEquals(List(), slice2.toList) } - + /** * Test the truncateTo method lops off messages and appropriately updates the size */ @@ -202,6 +204,17 @@ class FileMessageSetTest extends BaseMessageSetTestCases { assertEquals(oldposition, tempReopen.length) } + @Test + def testFormatConversionWithPartialMessage() { + val message = messageSet.toList(1) + val start = messageSet.searchFor(1, 0).position + val size = message.message.size + 12 + val slice = messageSet.read(start, size - 1) + val messageV0 = slice.toMessageFormat(Message.MagicValue_V0) + assertEquals("No message should be there", 0, messageV0.size) + assertEquals(s"There should be ${size - 1} bytes", size - 1, messageV0.sizeInBytes) + } + @Test def testMessageFormatConversion() {