Browse Source

CompressionUtils introduces a GZIP header while compressing empty message sets KAFKA-109; patched by Neha; reviewed by Jun

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1159459 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Neha Narkhede 13 years ago
parent
commit
900496eeda
  1. 20
      core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
  2. 17
      core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  3. 25
      core/src/main/scala/kafka/message/MessageSet.scala
  4. 2
      core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala

20
core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala

@ -31,24 +31,8 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -31,24 +31,8 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
this(compressionCodec match {
case NoCompressionCodec =>
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
val messageIterator = messages.iterator
while(messageIterator.hasNext) {
val message = messageIterator.next
message.serializeTo(buffer)
}
buffer.rewind
buffer
case _ =>
import scala.collection.JavaConversions._
val message = CompressionUtils.compress(asBuffer(messages), compressionCodec)
val buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
buffer
}, 0L, ErrorMapping.NoError)
this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
0L, ErrorMapping.NoError)
}
def this(messages: java.util.List[Message]) {

17
core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

@ -43,22 +43,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -43,22 +43,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
private var deepValidByteCount = -1L
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(
compressionCodec match {
case NoCompressionCodec =>
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(buffer)
}
buffer.rewind
buffer
case _ =>
val message = CompressionUtils.compress(messages, compressionCodec)
val buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
buffer
}, 0L, ErrorMapping.NoError)
this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
}
def this(messages: Message*) {

25
core/src/main/scala/kafka/message/MessageSet.scala

@ -51,7 +51,30 @@ object MessageSet { @@ -51,7 +51,30 @@ object MessageSet {
* The size of a size-delimited entry in a message set
*/
def entrySize(message: Message): Int = LogOverhead + message.size
def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer =
compressionCodec match {
case NoCompressionCodec =>
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(buffer)
}
buffer.rewind
buffer
case _ =>
messages.size match {
case 0 =>
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
buffer.rewind
buffer
case _ =>
val message = CompressionUtils.compress(messages, compressionCodec)
val buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
buffer
}
}
}
/**

2
core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala

@ -69,7 +69,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { @@ -69,7 +69,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testSizeInBytesWithCompression () {
assertEquals("Empty message set should have 0 bytes.",
30L, // overhead of the GZIP output stream
0L, // overhead of the GZIP output stream
createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
}
}

Loading…
Cancel
Save