Browse Source

ByteBufferMessageSet iterator bug returning incorrect offsets after reading a compressed empty message set KAFKA-111; patched by Jun; reviewed by Neha

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1159466 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Neha Narkhede 13 years ago
parent
commit
4f56e44100
  1. 19
      core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

19
core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

@ -111,9 +111,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -111,9 +111,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
if(size < 0 || topIter.remaining < size) {
deepValidByteCount = currValidBytes
if (currValidBytes == 0 || size < 0)
throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " +
" at %d possible causes (1) a single message larger than the fetch size; (2) log corruption "
.format(size, topIter.remaining, currValidBytes))
throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
"the fetch size; (2) log corruption )")
return allDone()
}
val message = topIter.slice()
@ -126,23 +126,30 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -126,23 +126,30 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
currValidBytes += 4 + size
if(logger.isTraceEnabled)
logger.trace("currValidBytes = " + currValidBytes)
new MessageAndOffset(newMessage, currValidBytes)
case _ =>
if(logger.isDebugEnabled)
logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
innerIter = CompressionUtils.decompress(newMessage).deepIterator
if (!innerIter.hasNext) {
currValidBytes += 4 + lastMessageSize
innerIter = null
}
makeNext()
}
}
override def makeNext(): MessageAndOffset = {
val isInnerDone = innerDone()
if(logger.isDebugEnabled)
logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
innerDone match {
logger.debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
isInnerDone match {
case true => makeNextOuter
case false => {
val messageAndOffset = innerIter.next
if(!innerIter.hasNext)
if (!innerIter.hasNext)
currValidBytes += 4 + lastMessageSize
new MessageAndOffset(messageAndOffset.message, currValidBytes)
}

Loading…
Cancel
Save