From d09043637d5ea4094e3ee9808f29d037e8afaba6 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 13 Oct 2016 21:08:28 -0700 Subject: [PATCH] KAFKA-4298; Ensure compressed message sets are not converted when log cleaning Author: Jason Gustafson Reviewers: Ismael Juma , Jun Rao Closes #2019 from hachikuji/KAFKA-4298 --- .../src/main/scala/kafka/log/LogCleaner.scala | 82 +++++++------ .../kafka/message/ByteBufferMessageSet.scala | 28 +++-- .../scala/unit/kafka/log/CleanerTest.scala | 110 +++++++++++++++++- .../kafka/log/LogCleanerIntegrationTest.scala | 56 ++++++++- .../message/ByteBufferMessageSetTest.scala | 52 +++++++++ 5 files changed, 280 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 3d9a20d0243..219957f328b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -401,7 +401,7 @@ private[log] class Cleaner(val id: Int, val retainDeletes = old.largestTimestamp > deleteHorizonMs info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) - cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion, log.config.maxMessageSize) + cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize) } // trim excess index @@ -439,7 +439,6 @@ private[log] class Cleaner(val id: Int, * @param dest The cleaned log segment * @param map The key=>offset mapping * @param retainDeletes Should delete tombstones be retained while cleaning this segment - * @param messageFormatVersion The message format version to use after compaction * @param maxLogMessageSize The maximum message size of the corresponding topic */ private[log] def cleanInto(topicAndPartition: TopicAndPartition, @@ -447,7 +446,6 @@ private[log] class Cleaner(val id: Int, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean, - messageFormatVersion: Byte, maxLogMessageSize: Int) { var position = 0 while (position < source.log.sizeInBytes) { @@ -461,41 +459,54 @@ private[log] class Cleaner(val id: Int, throttler.maybeThrottle(messages.sizeInBytes) // check each message to see if it is to be retained var messagesRead = 0 - for (entry <- messages.shallowIterator) { - val size = MessageSet.entrySize(entry.message) + for (shallowMessageAndOffset <- messages.shallowIterator) { + val shallowMessage = shallowMessageAndOffset.message + val shallowOffset = shallowMessageAndOffset.offset + val size = MessageSet.entrySize(shallowMessageAndOffset.message) + stats.readMessage(size) - if (entry.message.compressionCodec == NoCompressionCodec) { - if (shouldRetainMessage(source, map, retainDeletes, entry)) { - ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) + if (shallowMessage.compressionCodec == NoCompressionCodec) { + if (shouldRetainMessage(source, map, retainDeletes, shallowMessageAndOffset)) { + ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset) stats.recopyMessage(size) - if (entry.message.timestamp > maxTimestamp) { - maxTimestamp = entry.message.timestamp - offsetOfMaxTimestamp = entry.offset + if (shallowMessage.timestamp > maxTimestamp) { + maxTimestamp = shallowMessage.timestamp + offsetOfMaxTimestamp = shallowOffset } } messagesRead += 1 } else { - // We use the absolute offset to decide whether to retain the message or not. This is handled by the - // deep iterator. - val messages = ByteBufferMessageSet.deepIterator(entry) + // We use the absolute offset to decide whether to retain the message or not (this is handled by the + // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version + // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic + // of the inner messages. This will be fixed as we recopy the messages to the destination segment. + var writeOriginalMessageSet = true val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset] - messages.foreach { messageAndOffset => + val shallowMagic = shallowMessage.magic + + for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) { messagesRead += 1 - if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) { - retainedMessages += messageAndOffset + if (shouldRetainMessage(source, map, retainDeletes, deepMessageAndOffset)) { + // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite + // the corrupted entry with correct data. + if (shallowMagic != deepMessageAndOffset.message.magic) + writeOriginalMessageSet = false + + retainedMessages += deepMessageAndOffset // We need the max timestamp and last offset for time index - if (messageAndOffset.message.timestamp > maxTimestamp) - maxTimestamp = messageAndOffset.message.timestamp + if (deepMessageAndOffset.message.timestamp > maxTimestamp) + maxTimestamp = deepMessageAndOffset.message.timestamp + } else { + writeOriginalMessageSet = false } - else writeOriginalMessageSet = false } offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L // There are no messages compacted out and no message format conversion, write the original message set back if (writeOriginalMessageSet) - ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) + ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset) else - compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, retainedMessages) + compressMessages(writeBuffer, shallowMessage.compressionCodec, retainedMessages) } } @@ -518,29 +529,34 @@ private[log] class Cleaner(val id: Int, private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, - messageFormatVersion: Byte, messageAndOffsets: Seq[MessageAndOffset]) { require(compressionCodec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec") if (messageAndOffsets.nonEmpty) { val messages = messageAndOffsets.map(_.message) val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages) + + // ensure that we use the magic from the first message in the set when writing the wrapper + // message in order to fix message sets corrupted by KAFKA-4298 + val magic = magicAndTimestamp.magic + val firstMessageOffset = messageAndOffsets.head val firstAbsoluteOffset = firstMessageOffset.offset var offset = -1L val timestampType = firstMessageOffset.message.timestampType val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) - messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream => - val output = new DataOutputStream(CompressionFactory(compressionCodec, messageFormatVersion, outputStream)) + messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magic) { outputStream => + val output = new DataOutputStream(CompressionFactory(compressionCodec, magic, outputStream)) try { - for (messageOffset <- messageAndOffsets) { - val message = messageOffset.message - offset = messageOffset.offset - if (messageFormatVersion > Message.MagicValue_V0) { + for (messageAndOffset <- messageAndOffsets) { + offset = messageAndOffset.offset + val innerOffset = if (magic > Message.MagicValue_V0) // The offset of the messages are absolute offset, compute the inner offset. - val innerOffset = messageOffset.offset - firstAbsoluteOffset - output.writeLong(innerOffset) - } else - output.writeLong(offset) + messageAndOffset.offset - firstAbsoluteOffset + else + offset + + val message = messageAndOffset.message + output.writeLong(innerOffset) output.writeInt(message.size) output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) } diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index aadda86cf0e..1ef91b9a85a 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -78,7 +78,7 @@ object ByteBufferMessageSet { } /** Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed. */ - def deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset] = { + def deepIterator(wrapperMessageAndOffset: MessageAndOffset, ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = { import Message._ @@ -138,12 +138,15 @@ object ByteBufferMessageSet { // Override the timestamp if necessary val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt) - // Inner message and wrapper message must have same magic value - if (newMessage.magic != wrapperMessage.magic) - throw new IllegalStateException(s"Compressed message has magic value ${wrapperMessage.magic} " + + // Due to KAFKA-4298, it is possible for the inner and outer magic values to differ. We ignore + // this and depend on the outer message in order to decide how to compute the respective offsets + // for the inner messages + if (ensureMatchingMagic && newMessage.magic != wrapperMessage.magic) + throw new InvalidMessageException(s"Compressed message has magic value ${wrapperMessage.magic} " + s"but inner message has magic value ${newMessage.magic}") + lastInnerOffset = innerOffset - new MessageAndOffset(newMessage, innerOffset) + MessageAndOffset(newMessage, innerOffset) } override def makeNext(): MessageAndOffset = { @@ -153,7 +156,7 @@ object ByteBufferMessageSet { if (wrapperMessage.magic > MagicValue_V0) { val relativeOffset = offset - lastInnerOffset val absoluteOffset = wrapperMessageOffset + relativeOffset - new MessageAndOffset(message, absoluteOffset) + MessageAndOffset(message, absoluteOffset) } else { nextMessage } @@ -328,10 +331,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi override def iterator: Iterator[MessageAndOffset] = internalIterator() /** iterator over compressed messages without decompressing */ - def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true) + def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow = true) /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ - private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { + private def internalIterator(isShallow: Boolean = false, ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var topIter = buffer.slice() var innerIter: Iterator[MessageAndOffset] = null @@ -357,14 +360,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi topIter.position(topIter.position + size) val newMessage = new Message(message) if(isShallow) { - new MessageAndOffset(newMessage, offset) + MessageAndOffset(newMessage, offset) } else { newMessage.compressionCodec match { case NoCompressionCodec => innerIter = null - new MessageAndOffset(newMessage, offset) + MessageAndOffset(newMessage, offset) case _ => - innerIter = ByteBufferMessageSet.deepIterator(new MessageAndOffset(newMessage, offset)) + innerIter = ByteBufferMessageSet.deepIterator(MessageAndOffset(newMessage, offset), ensureMatchingMagic) if(!innerIter.hasNext) innerIter = null makeNext() @@ -435,7 +438,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi var offsetOfMaxTimestamp = -1L val expectedInnerOffset = new LongRef(0) val validatedMessages = new mutable.ArrayBuffer[Message] - this.internalIterator(isShallow = false).foreach { messageAndOffset => + + this.internalIterator(isShallow = false, ensureMatchingMagic = true).foreach { messageAndOffset => val message = messageAndOffset.message validateMessageKey(message, compactedTopic) diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index f4458a01bda..536f10d741c 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -17,7 +17,7 @@ package kafka.log -import java.io.File +import java.io.{DataOutputStream, File} import java.nio._ import java.nio.file.Paths import java.util.Properties @@ -25,6 +25,7 @@ import java.util.Properties import kafka.common._ import kafka.message._ import kafka.utils._ +import org.apache.kafka.common.record.{MemoryRecords, TimestampType} import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Test} @@ -140,7 +141,7 @@ class CleanerTest extends JUnitSuite { @Test def testPartialSegmentClean(): Unit = { // because loadFactor is 0.75, this means we can fit 2 messages in the map - var cleaner = makeCleaner(2) + val cleaner = makeCleaner(2) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) @@ -610,11 +611,116 @@ class CleanerTest extends JUnitSuite { assertEquals(-1, map.get(key(4))) } + /** + * This test verifies that messages corrupted by KAFKA-4298 are fixed by the cleaner + */ + @Test + def testCleanCorruptMessageSet() { + val codec = SnappyCompressionCodec + + val logProps = new Properties() + logProps.put(LogConfig.CompressionTypeProp, codec.name) + val logConfig = LogConfig(logProps) + + val log = makeLog(config = logConfig) + val cleaner = makeCleaner(10) + + // messages are constructed so that the payload matches the expecting offset to + // make offset validation easier after cleaning + + // one compressed log entry with duplicates + val dupSetKeys = (0 until 2) ++ (0 until 2) + val dupSetOffset = 25 + val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + dupSetKeys.size) + + // and one without (should still be fixed by the cleaner) + val noDupSetKeys = 3 until 5 + val noDupSetOffset = 50 + val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + noDupSetKeys.size) + + log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), assignOffsets = false) + log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), assignOffsets = false) + + log.roll() + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + + for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) { + assertEquals(shallowMessage.message.magic, deepMessage.message.magic) + val value = TestUtils.readString(deepMessage.message.payload).toLong + assertEquals(deepMessage.offset, value) + } + } + + /** + * Verify that the client can handle corrupted messages. Located here for now since the client + * does not support writing messages with the old magic. + */ + @Test + def testClientHandlingOfCorruptMessageSet(): Unit = { + import JavaConverters._ + + val keys = 1 until 10 + val offset = 50 + val set = keys zip (offset until offset + keys.size) + + val corruptedMessage = invalidCleanedMessage(offset, set) + val records = MemoryRecords.readableRecords(corruptedMessage.buffer) + + for (logEntry <- records.iterator.asScala) { + val offset = logEntry.offset + val value = TestUtils.readString(logEntry.record.value).toLong + assertEquals(offset, value) + } + } + private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { for(((key, value), offset) <- keysAndValues.zip(offsetSeq)) yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset } + private def invalidCleanedMessage(initialOffset: Long, + keysAndValues: Iterable[(Int, Int)], + codec: CompressionCodec = SnappyCompressionCodec): ByteBufferMessageSet = { + // this function replicates the old versions of the cleaner which under some circumstances + // would write invalid compressed message sets with the outer magic set to 1 and the inner + // magic set to 0 + + val messages = keysAndValues.map(kv => + new Message(key = kv._1.toString.getBytes, + bytes = kv._2.toString.getBytes, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V0)) + + val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) + var lastOffset = initialOffset + + messageWriter.write( + codec = codec, + timestamp = Message.NoTimestamp, + timestampType = TimestampType.CREATE_TIME, + magicValue = Message.MagicValue_V1) { outputStream => + + val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream)) + try { + for (message <- messages) { + val innerOffset = lastOffset - initialOffset + output.writeLong(innerOffset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + lastOffset += 1 + } + } finally { + output.close() + } + } + val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) + ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1) + buffer.rewind() + + new ByteBufferMessageSet(buffer) + } + private def messageWithOffset(key: Int, value: Int, offset: Long) = new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), new Message(key = key.toString.getBytes, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 9e4951a0368..40030cbd358 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -191,6 +191,43 @@ class LogCleanerIntegrationTest(compressionCodec: String) { checkLogAfterAppendingDups(log, startSize, appends2) } + @Test + def testCleaningNestedMessagesWithMultipleVersions(): Unit = { + val maxMessageSize = 192 + cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize) + + val log = cleaner.logs.get(topics(0)) + val props = logConfigProperties(maxMessageSize = maxMessageSize) + props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version) + log.config = new LogConfig(props) + + // with compression enabled, these messages will be written as a single message containing + // all of the individual messages + var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0) + appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V0) + + props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version) + log.config = new LogConfig(props) + + var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1) + + val appends = appendsV0 ++ appendsV1 + + val startSize = log.size + cleaner.startup() + + val firstDirty = log.activeSegment.baseOffset + assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1 + + checkLastCleaned("log", 0, firstDirty) + val compactedSize = log.logSegments.map(_.size).sum + assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) + + checkLogAfterAppendingDups(log, startSize, appends) + } + private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) { // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than // LogConfig.MinCleanableDirtyRatioProp @@ -230,7 +267,24 @@ class LogCleanerIntegrationTest(compressionCodec: String) { (key, payload) } } - + + private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, + startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String)] = { + val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { + val payload = counter.toString + counter += 1 + (key, payload) + } + + val messages = kvs.map { case (key, payload) => + new Message(payload.toString.getBytes, key.toString.getBytes, Message.NoTimestamp, magicValue) + } + + val messageSet = new ByteBufferMessageSet(compressionCodec = codec, messages: _*) + log.append(messageSet, assignOffsets = true) + kvs + } + @After def tearDown(): Unit = { cleaner.shutdown() diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 39eb84c8fd4..18a023c5137 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -17,6 +17,7 @@ package kafka.message +import java.io.DataOutputStream import java.nio._ import kafka.common.LongRef @@ -364,6 +365,19 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { checkOffsets(compressedMessagesWithOffset, offset) } + @Test(expected = classOf[InvalidMessageException]) + def testInvalidInnerMagicVersion(): Unit = { + val offset = 1234567 + val messages = messageSetWithInvalidInnerMagic(SnappyCompressionCodec, offset) + messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = SnappyCompressionCodec, + targetCodec = SnappyCompressionCodec, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedMessages + } + + @Test def testOffsetAssignmentAfterMessageFormatConversion() { // Check up conversion @@ -460,4 +474,42 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { new Message("beautiful".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1)) } } + + private def messageSetWithInvalidInnerMagic(codec: CompressionCodec, + initialOffset: Long): ByteBufferMessageSet = { + val messages = (0 until 20).map(id => + new Message(key = id.toString.getBytes, + bytes = id.toString.getBytes, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V0)) + + val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) + var lastOffset = initialOffset + + messageWriter.write( + codec = codec, + timestamp = System.currentTimeMillis(), + timestampType = TimestampType.CREATE_TIME, + magicValue = Message.MagicValue_V1) { outputStream => + + val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream)) + try { + for (message <- messages) { + val innerOffset = lastOffset - initialOffset + output.writeLong(innerOffset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + lastOffset += 1 + } + } finally { + output.close() + } + } + val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) + ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1) + buffer.rewind() + + new ByteBufferMessageSet(buffer) + } + }