diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 37680dc801a..c7ba4f94142 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -84,10 +84,10 @@ object Message { class Message(val buffer: ByteBuffer) { import kafka.message.Message._ - - - private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = { - this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length)) + + + private def this(checksum: Long, bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = { + this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + size)) buffer.put(CurrentMagicValue) var attributes:Byte = 0 if (compressionCodec.codec > 0) { @@ -95,18 +95,22 @@ class Message(val buffer: ByteBuffer) { } buffer.put(attributes) Utils.putUnsignedInt(buffer, checksum) - buffer.put(bytes) + buffer.put(bytes, offset, size) buffer.rewind() } - def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, NoCompressionCodec) - - def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = { + def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, 0, bytes.length, NoCompressionCodec) + + def this(bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = { //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there - this(Utils.crc32(bytes), bytes, compressionCodec) + this(Utils.crc32(bytes, offset, size), bytes, offset, size, compressionCodec) } - def this(bytes: Array[Byte]) = this(bytes, NoCompressionCodec) + def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = this(bytes, 0, bytes.length, compressionCodec) + + def this(bytes: Array[Byte], offset: Int, size: Int) = this(bytes, offset, size, NoCompressionCodec) + + def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length, NoCompressionCodec) def size: Int = buffer.limit