Browse Source

The FetcherRunnable busy waits on empty fetch requests; KAFKA-117; patched by nehanarkhede; reviewed by junrao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1160952 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Neha Narkhede 13 years ago
parent
commit
bf30ae996f
  1. 2
      core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
  2. 21
      core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  3. 20
      core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

2
core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

@ -59,7 +59,7 @@ private[consumer] class PartitionTopicInfo(val topic: String, @@ -59,7 +59,7 @@ private[consumer] class PartitionTopicInfo(val topic: String,
* @return the number of valid bytes
*/
def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
val size = messages.shallowValidBytes
val size = messages.validBytes
if(size > 0) {
// update fetched offset to the compressed data chunk size, not the decompressed message set size
if(logger.isTraceEnabled)

21
core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

@ -40,7 +40,6 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -40,7 +40,6 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1L
private var shallowValidByteCount = -1L
private var deepValidByteCount = -1L
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@ -58,9 +57,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -58,9 +57,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
def serialized(): ByteBuffer = buffer
def validBytes: Long = deepValidBytes
def shallowValidBytes: Long = {
def validBytes: Long = shallowValidBytes
private def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
val iter = deepIterator
while(iter.hasNext) {
@ -68,18 +67,10 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -68,18 +67,10 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
shallowValidByteCount = messageAndOffset.offset
}
}
shallowValidByteCount - initialOffset
if(shallowValidByteCount < initialOffset) 0
else (shallowValidByteCount - initialOffset)
}
def deepValidBytes: Long = {
if (deepValidByteCount < 0) {
val iter = deepIterator
while (iter.hasNext)
iter.next
}
deepValidByteCount
}
/** Write the messages in this set to the given channel */
def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)
@ -98,7 +89,6 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -98,7 +89,6 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
def makeNextOuter: MessageAndOffset = {
if (topIter.remaining < 4) {
deepValidByteCount = currValidBytes
return allDone()
}
val size = topIter.getInt()
@ -109,7 +99,6 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, @@ -109,7 +99,6 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
logger.trace("size of data = " + size)
}
if(size < 0 || topIter.remaining < size) {
deepValidByteCount = currValidBytes
if (currValidBytes == 0 || size < 0)
throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +

20
core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

@ -29,12 +29,20 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @@ -29,12 +29,20 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
@Test
def testValidBytes() {
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
buffer.put(messages.serialized)
buffer.putShort(4)
val messagesPlus = new ByteBufferMessageSet(buffer)
assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
{
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
buffer.put(messages.serialized)
buffer.putShort(4)
val messagesPlus = new ByteBufferMessageSet(buffer)
assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
}
// test valid bytes on empty ByteBufferMessageSet
{
assertEquals("Valid bytes on an empty ByteBufferMessageSet should return 0", 0,
MessageSet.Empty.asInstanceOf[ByteBufferMessageSet].validBytes)
}
}
@Test

Loading…
Cancel
Save