Browse Source

Change MessageSet.sizeInBytes to Int; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-556

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1401760 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
23bce10e77
  1. 8
      core/src/main/scala/kafka/api/FetchResponse.scala
  2. 2
      core/src/main/scala/kafka/api/ProducerRequest.scala
  3. 4
      core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
  4. 2
      core/src/main/scala/kafka/javaapi/message/MessageSet.scala
  5. 18
      core/src/main/scala/kafka/log/FileMessageSet.scala
  6. 6
      core/src/main/scala/kafka/log/Log.scala
  7. 2
      core/src/main/scala/kafka/log/LogSegment.scala
  8. 14
      core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  9. 4
      core/src/main/scala/kafka/message/MessageSet.scala
  10. 10
      core/src/main/scala/kafka/server/MessageSetSend.scala
  11. 6
      core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
  12. 6
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  13. 8
      core/src/test/scala/unit/kafka/log/LogTest.scala
  14. 4
      core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
  15. 4
      core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

8
core/src/main/scala/kafka/api/FetchResponse.scala

@ -47,7 +47,7 @@ object FetchResponsePartitionData { @@ -47,7 +47,7 @@ object FetchResponsePartitionData {
case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes.intValue()
val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
@ -58,14 +58,14 @@ case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, @@ -58,14 +58,14 @@ case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
class PartitionDataSend(val partitionId: Int,
val partitionData: FetchResponsePartitionData) extends Send {
private val messageSize = partitionData.messages.sizeInBytes
private var messagesSentSize = 0L
private var messagesSentSize = 0
private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
buffer.putInt(partitionId)
buffer.putShort(partitionData.error)
buffer.putLong(partitionData.initialOffset)
buffer.putLong(partitionData.hw)
buffer.putInt(partitionData.messages.sizeInBytes.intValue())
buffer.putInt(partitionData.messages.sizeInBytes)
buffer.rewind()
override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
@ -75,7 +75,7 @@ class PartitionDataSend(val partitionId: Int, @@ -75,7 +75,7 @@ class PartitionDataSend(val partitionId: Int,
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && messagesSentSize < messageSize) {
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
messagesSentSize += bytesSent
written += bytesSent
}

2
core/src/main/scala/kafka/api/ProducerRequest.scala

@ -113,7 +113,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, @@ -113,7 +113,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
foldedPartitions +
4 + /* partition id */
4 + /* byte-length of serialized messages */
currPartition._2.sizeInBytes.toInt
currPartition._2.sizeInBytes
})
}
})

4
core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala

@ -32,7 +32,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message @@ -32,7 +32,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
this(NoCompressionCodec, messages)
}
def validBytes: Long = underlying.validBytes
def validBytes: Int = underlying.validBytes
override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
val underlyingIterator = underlying.iterator
@ -49,7 +49,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message @@ -49,7 +49,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
override def toString: String = underlying.toString
def sizeInBytes: Long = underlying.sizeInBytes
def sizeInBytes: Int = underlying.sizeInBytes
override def equals(other: Any): Boolean = {
other match {

2
core/src/main/scala/kafka/javaapi/message/MessageSet.scala

@ -38,7 +38,7 @@ abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] { @@ -38,7 +38,7 @@ abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] {
/**
* Gives the total size of this message set in bytes
*/
def sizeInBytes: Long
def sizeInBytes: Int
/**
* Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't

18
core/src/main/scala/kafka/log/FileMessageSet.scala

@ -36,12 +36,12 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} @@ -36,12 +36,12 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
@nonthreadsafe
class FileMessageSet private[kafka](val file: File,
private[log] val channel: FileChannel,
private[log] val start: Long = 0L,
private[log] val limit: Long = Long.MaxValue,
private[log] val start: Int = 0,
private[log] val limit: Int = Int.MaxValue,
initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging {
/* the size of the message set in bytes */
private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start)
private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
if (initChannelPositionToEnd) {
/* set the file position to the last byte in the file */
@ -51,7 +51,7 @@ class FileMessageSet private[kafka](val file: File, @@ -51,7 +51,7 @@ class FileMessageSet private[kafka](val file: File,
/**
* Create a file message set with no limit or offset
*/
def this(file: File, channel: FileChannel) = this(file, channel, 0, Long.MaxValue)
def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue)
/**
* Create a file message set with no limit or offset
@ -61,7 +61,7 @@ class FileMessageSet private[kafka](val file: File, @@ -61,7 +61,7 @@ class FileMessageSet private[kafka](val file: File,
/**
* Return a message set which is a view into this set starting from the given position and with the given size limit.
*/
def read(position: Long, size: Long): FileMessageSet = {
def read(position: Int, size: Int): FileMessageSet = {
new FileMessageSet(file,
channel,
this.start + position,
@ -96,8 +96,8 @@ class FileMessageSet private[kafka](val file: File, @@ -96,8 +96,8 @@ class FileMessageSet private[kafka](val file: File,
/**
* Write some of this set to the given channel, return the amount written
*/
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Long): Long =
channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel)
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
/**
* Get an iterator over the messages in the set. We only do shallow iteration here.
@ -136,7 +136,7 @@ class FileMessageSet private[kafka](val file: File, @@ -136,7 +136,7 @@ class FileMessageSet private[kafka](val file: File,
/**
* The number of bytes taken up by this file set
*/
def sizeInBytes(): Long = _size.get()
def sizeInBytes(): Int = _size.get()
/**
* Append this message to the message set
@ -175,7 +175,7 @@ class FileMessageSet private[kafka](val file: File, @@ -175,7 +175,7 @@ class FileMessageSet private[kafka](val file: File,
* Truncate this file message set to the given size. Note that this API does no checking that the
* given size falls on a valid byte offset.
*/
def truncateTo(targetSize: Long) = {
def truncateTo(targetSize: Int) = {
if(targetSize > sizeInBytes())
throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
" size of this log segment is only %d bytes".format(sizeInBytes()))

6
core/src/main/scala/kafka/log/Log.scala

@ -99,7 +99,7 @@ object Log { @@ -99,7 +99,7 @@ object Log {
*/
@threadsafe
private[kafka] class Log(val dir: File,
val maxLogFileSize: Long,
val maxLogFileSize: Int,
val maxMessageSize: Int,
val flushInterval: Int = Int.MaxValue,
val rollIntervalMs: Long = Long.MaxValue,
@ -337,14 +337,14 @@ private[kafka] class Log(val dir: File, @@ -337,14 +337,14 @@ private[kafka] class Log(val dir: File,
*/
private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
val messageSetValidBytes = messages.validBytes
if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
if(messageSetValidBytes < 0)
throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
if(messageSetValidBytes == messages.sizeInBytes) {
messages
} else {
// trim invalid bytes
val validByteBuffer = messages.buffer.duplicate()
validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
validByteBuffer.limit(messageSetValidBytes)
new ByteBufferMessageSet(validByteBuffer)
}
}

2
core/src/main/scala/kafka/log/LogSegment.scala

@ -59,7 +59,7 @@ class LogSegment(val messageSet: FileMessageSet, @@ -59,7 +59,7 @@ class LogSegment(val messageSet: FileMessageSet,
// append the messages
messageSet.append(messages)
updateFirstAppendTime()
this.bytesSinceLastIndexEntry += messages.sizeInBytes.toInt
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}

14
core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

@ -93,9 +93,7 @@ object ByteBufferMessageSet { @@ -93,9 +93,7 @@ object ByteBufferMessageSet {
*
*/
class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet with Logging {
private var shallowValidByteCount = -1L
if(sizeInBytes > Int.MaxValue)
throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
private var shallowValidByteCount = -1
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
@ -109,7 +107,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message @@ -109,7 +107,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
this(NoCompressionCodec, new AtomicLong(0), messages: _*)
}
private def shallowValidBytes: Long = {
private def shallowValidBytes: Int = {
if(shallowValidByteCount < 0) {
var bytes = 0
val iter = this.internalIterator(true)
@ -123,10 +121,10 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message @@ -123,10 +121,10 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
}
/** Write the messages in this set to the given channel */
def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = {
def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int = {
// Ignore offset and size from input. We just want to write the whole buffer to the channel.
buffer.mark()
var written = 0L
var written = 0
while(written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
@ -223,12 +221,12 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message @@ -223,12 +221,12 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
/**
* The total number of bytes in this message set, including any partial trailing messages
*/
def sizeInBytes: Long = buffer.limit
def sizeInBytes: Int = buffer.limit
/**
* The total number of bytes in this message set not including any partial, trailing messages
*/
def validBytes: Long = shallowValidBytes
def validBytes: Int = shallowValidBytes
/**
* Two message sets are equal if their respective byte buffers are equal

4
core/src/main/scala/kafka/message/MessageSet.scala

@ -69,7 +69,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { @@ -69,7 +69,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
/** Write the messages in this set to the given channel starting at the given offset byte.
* Less than the complete amount may be written, but no more than maxSize can be. The number
* of bytes written is returned */
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
/**
* Provides an iterator over the message/offset pairs in this set
@ -79,7 +79,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { @@ -79,7 +79,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
/**
* Gives the total size of this message set in bytes
*/
def sizeInBytes: Long
def sizeInBytes: Int
/**
* Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't

10
core/src/main/scala/kafka/server/MessageSetSend.scala

@ -31,10 +31,10 @@ import kafka.common.ErrorMapping @@ -31,10 +31,10 @@ import kafka.common.ErrorMapping
@nonthreadsafe
private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send {
private var sent: Long = 0
private var size: Long = messages.sizeInBytes
private var sent: Int = 0
private val size: Int = messages.sizeInBytes
private val header = ByteBuffer.allocate(6)
header.putInt(size.asInstanceOf[Int] + 2)
header.putInt(size + 2)
header.putShort(errorCode)
header.rewind()
@ -51,7 +51,7 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Sh @@ -51,7 +51,7 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Sh
written += channel.write(header)
if(!header.hasRemaining) {
val fileBytesSent = messages.writeTo(channel, sent, size - sent)
written += fileBytesSent.asInstanceOf[Int]
written += fileBytesSent
sent += fileBytesSent
}
@ -66,6 +66,6 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Sh @@ -66,6 +66,6 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Sh
written
}
def sendSize: Int = size.asInstanceOf[Int] + header.capacity
def sendSize: Int = size + header.capacity
}

6
core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala

@ -58,17 +58,17 @@ trait BaseMessageSetTestCases extends JUnitSuite { @@ -58,17 +58,17 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testSizeInBytes() {
assertEquals("Empty message set should have 0 bytes.",
0L,
0,
createMessageSet(Array[Message]()).sizeInBytes)
assertEquals("Predicted size should equal actual size.",
kafka.message.MessageSet.messageSetSize(messages).toLong,
kafka.message.MessageSet.messageSetSize(messages),
createMessageSet(messages).sizeInBytes)
}
@Test
def testSizeInBytesWithCompression () {
assertEquals("Empty message set should have 0 bytes.",
0L, // overhead of the GZIP output stream
0, // overhead of the GZIP output stream
createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
}
}

6
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -92,7 +92,7 @@ class LogManagerTest extends JUnit3Suite { @@ -92,7 +92,7 @@ class LogManagerTest extends JUnit3Suite {
time.currentMs += maxLogAge + 3000
logManager.cleanupLogs()
assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0L, log.read(offset+1, 1024).sizeInBytes)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
try {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
@ -111,7 +111,7 @@ class LogManagerTest extends JUnit3Suite { @@ -111,7 +111,7 @@ class LogManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logRetentionHours = retentionHours
override val flushInterval = 100
@ -138,7 +138,7 @@ class LogManagerTest extends JUnit3Suite { @@ -138,7 +138,7 @@ class LogManagerTest extends JUnit3Suite {
// this cleanup shouldn't find any expired segments but should delete some to reduce size
logManager.cleanupLogs()
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0L, log.read(offset + 1, 1024).sizeInBytes)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
try {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")

8
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -93,7 +93,7 @@ class LogTest extends JUnitSuite { @@ -93,7 +93,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
@ -132,7 +132,7 @@ class LogTest extends JUnitSuite { @@ -132,7 +132,7 @@ class LogTest extends JUnitSuite {
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
fail("Expected exception on invalid read.")
@ -250,7 +250,7 @@ class LogTest extends JUnitSuite { @@ -250,7 +250,7 @@ class LogTest extends JUnitSuite {
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
val log = new Log(logDir, 100, maxMessageSize.toInt, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val log = new Log(logDir, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
// should be able to append the small message
log.append(first)
@ -297,7 +297,7 @@ class LogTest extends JUnitSuite { @@ -297,7 +297,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)

4
core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala

@ -46,10 +46,10 @@ trait BaseMessageSetTestCases extends JUnitSuite { @@ -46,10 +46,10 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testSizeInBytes() {
assertEquals("Empty message set should have 0 bytes.",
0L,
0,
createMessageSet(Array[Message]()).sizeInBytes)
assertEquals("Predicted size should equal actual size.",
MessageSet.messageSetSize(messages).toLong,
MessageSet.messageSetSize(messages),
createMessageSet(messages).sizeInBytes)
}

4
core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

@ -32,7 +32,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @@ -32,7 +32,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
def testValidBytes() {
{
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
buffer.put(messages.buffer)
buffer.putShort(4)
val messagesPlus = new ByteBufferMessageSet(buffer)
@ -50,7 +50,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @@ -50,7 +50,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
def testValidBytesWithCompression() {
{
val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
buffer.put(messages.buffer)
buffer.putShort(4)
val messagesPlus = new ByteBufferMessageSet(buffer)

Loading…
Cancel
Save