From 900496eedabd7acfdeaaf077239228f4d94312e8 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Fri, 19 Aug 2011 00:47:34 +0000 Subject: [PATCH] CompressionUtils introduces a GZIP header while compressing empty message sets KAFKA-109; patched by Neha; reviewed by Jun git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1159459 13f79535-47bb-0310-9956-ffa450edef68 --- .../message/ByteBufferMessageSet.scala | 20 ++------------- .../kafka/message/ByteBufferMessageSet.scala | 17 +------------ .../main/scala/kafka/message/MessageSet.scala | 25 ++++++++++++++++++- .../message/BaseMessageSetTestCases.scala | 2 +- 4 files changed, 28 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 673a4bd3170..9724dc98b40 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -31,24 +31,8 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(compressionCodec match { - case NoCompressionCodec => - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - val messageIterator = messages.iterator - while(messageIterator.hasNext) { - val message = messageIterator.next - message.serializeTo(buffer) - } - buffer.rewind - buffer - case _ => - import scala.collection.JavaConversions._ - val message = CompressionUtils.compress(asBuffer(messages), compressionCodec) - val buffer = ByteBuffer.allocate(message.serializedSize) - message.serializeTo(buffer) - buffer.rewind - buffer - }, 0L, ErrorMapping.NoError) + this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), + 0L, ErrorMapping.NoError) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 09d9b059108..85bbd120633 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -43,22 +43,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, private var deepValidByteCount = -1L def this(compressionCodec: CompressionCodec, messages: Message*) { - this( - compressionCodec match { - case NoCompressionCodec => - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - for (message <- messages) { - message.serializeTo(buffer) - } - buffer.rewind - buffer - case _ => - val message = CompressionUtils.compress(messages, compressionCodec) - val buffer = ByteBuffer.allocate(message.serializedSize) - message.serializeTo(buffer) - buffer.rewind - buffer - }, 0L, ErrorMapping.NoError) + this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError) } def this(messages: Message*) { diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index f4374dc4a19..807de4af158 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -51,7 +51,30 @@ object MessageSet { * The size of a size-delimited entry in a message set */ def entrySize(message: Message): Int = LogOverhead + message.size - + + def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = + compressionCodec match { + case NoCompressionCodec => + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + for (message <- messages) { + message.serializeTo(buffer) + } + buffer.rewind + buffer + case _ => + messages.size match { + case 0 => + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + buffer.rewind + buffer + case _ => + val message = CompressionUtils.compress(messages, compressionCodec) + val buffer = ByteBuffer.allocate(message.serializedSize) + message.serializeTo(buffer) + buffer.rewind + buffer + } + } } /** diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index 1ee152463c3..d90a9f549a3 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -69,7 +69,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { @Test def testSizeInBytesWithCompression () { assertEquals("Empty message set should have 0 bytes.", - 30L, // overhead of the GZIP output stream + 0L, // overhead of the GZIP output stream createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes) } }