@ -401,7 +401,7 @@ private[log] class Cleaner(val id: Int,
@@ -401,7 +401,7 @@ private[log] class Cleaner(val id: Int,
val retainDeletes = old . largestTimestamp > deleteHorizonMs
info ( "Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
. format ( old . baseOffset , log . name , new Date ( old . largestTimestamp ) , cleaned . baseOffset , if ( retainDeletes ) "retaining" else "discarding" ) )
cleanInto ( log . topicAndPartition , old , cleaned , map , retainDeletes , log . config . messageFormatVersion . messageFormatVersion , log . config . m axMessageSize )
cleanInto ( log . topicAndPartition , old , cleaned , map , retainDeletes , log . config . maxMessageSize )
}
// trim excess index
@ -439,7 +439,6 @@ private[log] class Cleaner(val id: Int,
@@ -439,7 +439,6 @@ private[log] class Cleaner(val id: Int,
* @param dest The cleaned log segment
* @param map The key => offset mapping
* @param retainDeletes Should delete tombstones be retained while cleaning this segment
* @param messageFormatVersion The message format version to use after compaction
* @param maxLogMessageSize The maximum message size of the corresponding topic
*/
private [ log ] def cleanInto ( topicAndPartition : TopicAndPartition ,
@ -447,7 +446,6 @@ private[log] class Cleaner(val id: Int,
@@ -447,7 +446,6 @@ private[log] class Cleaner(val id: Int,
dest : LogSegment ,
map : OffsetMap ,
retainDeletes : Boolean ,
messageFormatVersion : Byte ,
maxLogMessageSize : Int ) {
var position = 0
while ( position < source . log . sizeInBytes ) {
@ -461,41 +459,54 @@ private[log] class Cleaner(val id: Int,
@@ -461,41 +459,54 @@ private[log] class Cleaner(val id: Int,
throttler . maybeThrottle ( messages . sizeInBytes )
// check each message to see if it is to be retained
var messagesRead = 0
for ( entry <- messages . shallowIterator ) {
val size = MessageSet . entrySize ( entry . message )
for ( shallowMessageAndOffset <- messages . shallowIterator ) {
val shallowMessage = shallowMessageAndOffset . message
val shallowOffset = shallowMessageAndOffset . offset
val size = MessageSet . entrySize ( shallowMessageAndOffset . message )
stats . readMessage ( size )
if ( entry . message . compressionCodec == NoCompressionCodec ) {
if ( shouldRetainMessage ( source , map , retainDeletes , entry ) ) {
ByteBufferMessageSet . writeMessage ( writeBuffer , entry . message , entry . o ffset)
if ( shallowM essage. compressionCodec == NoCompressionCodec ) {
if ( shouldRetainMessage ( source , map , retainDeletes , shallowM essageA ndOffse t) ) {
ByteBufferMessageSet . writeMessage ( writeBuffer , shallowMessage , shallowO ffset)
stats . recopyMessage ( size )
if ( entry . m essage. timestamp > maxTimestamp ) {
maxTimestamp = entry . m essage. timestamp
offsetOfMaxTimestamp = entry . o ffset
if ( shallowM essage. timestamp > maxTimestamp ) {
maxTimestamp = shallowM essage. timestamp
offsetOfMaxTimestamp = shallowO ffset
}
}
messagesRead += 1
} else {
// We use the absolute offset to decide whether to retain the message or not . This is handled by the
// deep iterator .
val messages = ByteBufferMessageSet . deepIterator ( entry )
// We use the absolute offset to decide whether to retain the message or not ( this is handled by the
// deep iterator ) . Because of KAFKA - 4298 , we have to allow for the possibility that a previous version
// corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
// of the inner messages . This will be fixed as we recopy the messages to the destination segment .
var writeOriginalMessageSet = true
val retainedMessages = new mutable . ArrayBuffer [ MessageAndOffset ]
messages . foreach { messageAndOffset =>
val shallowMagic = shallowMessage . magic
for ( deepMessageAndOffset <- ByteBufferMessageSet . deepIterator ( shallowMessageAndOffset ) ) {
messagesRead += 1
if ( shouldRetainMessage ( source , map , retainDeletes , messageAndOffset ) ) {
retainedMessages += messageAndOffset
if ( shouldRetainMessage ( source , map , retainDeletes , deepMessageAndOffset ) ) {
// Check for log corruption due to KAFKA - 4298. If we find it , make sure that we overwrite
// the corrupted entry with correct data .
if ( shallowMagic != deepMessageAndOffset . message . magic )
writeOriginalMessageSet = false
retainedMessages += deepMessageAndOffset
// We need the max timestamp and last offset for time index
if ( messageAndOffset . message . timestamp > maxTimestamp )
maxTimestamp = messageAndOffset . message . timestamp
if ( deepMessageAndOffset . message . timestamp > maxTimestamp )
maxTimestamp = deepMessageAndOffset . message . timestamp
} else {
writeOriginalMessageSet = false
}
else writeOriginalMessageSet = false
}
offsetOfMaxTimestamp = if ( retainedMessages . nonEmpty ) retainedMessages . last . offset else - 1L
// There are no messages compacted out and no message format conversion , write the original message set back
if ( writeOriginalMessageSet )
ByteBufferMessageSet . writeMessage ( writeBuffer , entry . message , entry . o ffset)
ByteBufferMessageSet . writeMessage ( writeBuffer , shallowMessage , shallowO ffset)
else
compressMessages ( writeBuffer , entry . m essage. compressionCodec , messageFormatVersion , retainedMessages )
compressMessages ( writeBuffer , shallowM essage. compressionCodec , retainedMessages )
}
}
@ -518,29 +529,34 @@ private[log] class Cleaner(val id: Int,
@@ -518,29 +529,34 @@ private[log] class Cleaner(val id: Int,
private def compressMessages ( buffer : ByteBuffer ,
compressionCodec : CompressionCodec ,
messageFormatVersion : Byte ,
messageAndOffsets : Seq [ MessageAndOffset ] ) {
require ( compressionCodec != NoCompressionCodec , s" compressionCodec must not be $NoCompressionCodec " )
if ( messageAndOffsets . nonEmpty ) {
val messages = messageAndOffsets . map ( _ . message )
val magicAndTimestamp = MessageSet . magicAndLargestTimestamp ( messages )
// ensure that we use the magic from the first message in the set when writing the wrapper
// message in order to fix message sets corrupted by KAFKA - 4298
val magic = magicAndTimestamp . magic
val firstMessageOffset = messageAndOffsets . head
val firstAbsoluteOffset = firstMessageOffset . offset
var offset = - 1L
val timestampType = firstMessageOffset . message . timestampType
val messageWriter = new MessageWriter ( math . min ( math . max ( MessageSet . messageSetSize ( messages ) / 2 , 1024 ) , 1 << 16 ) )
messageWriter . write ( codec = compressionCodec , timestamp = magicAndTimestamp . timestamp , timestampType = timestampType , magicValue = messageFormatVersion ) { outputStream =>
val output = new DataOutputStream ( CompressionFactory ( compressionCodec , messageFormatVersion , outputStream ) )
messageWriter . write ( codec = compressionCodec , timestamp = magicAndTimestamp . timestamp , timestampType = timestampType , magicValue = magic ) { outputStream =>
val output = new DataOutputStream ( CompressionFactory ( compressionCodec , magic , outputStream ) )
try {
for ( messageOffset <- messageAndOffsets ) {
val message = messageOffset . message
offset = messageOffset . offset
if ( messageFormatVersion > Message . MagicValue_V0 ) {
for ( messageAndOffset <- messageAndOffsets ) {
offset = messageAndOffset . offset
val innerOffset = if ( magic > Message . MagicValue_V0 )
// The offset of the messages are absolute offset , compute the inner offset .
val innerOffset = messageOffset . offset - firstAbsoluteOffset
output . writeLong ( innerOffset )
} else
output . writeLong ( offset )
messageAndOffset . offset - firstAbsoluteOffset
else
offset
val message = messageAndOffset . message
output . writeLong ( innerOffset )
output . writeInt ( message . size )
output . write ( message . buffer . array , message . buffer . arrayOffset , message . buffer . limit )
}