Browse Source

KAFKA-727 broker can still expose uncommitted data to a consumer; reviewed by Neha Narkhede

0.8.0-beta1-candidate1
Edward Jay Kreps 12 years ago committed by Neha Narkhede
parent
commit
311a5d81d8
  1. 2
      core/src/main/scala/kafka/log/Log.scala
  2. 5
      core/src/main/scala/kafka/log/LogSegment.scala
  3. 109
      core/src/test/scala/other/kafka/StressTestLog.scala

2
core/src/main/scala/kafka/log/Log.scala

@ -369,7 +369,7 @@ private[kafka] class Log(val dir: File, @@ -369,7 +369,7 @@ private[kafka] class Log(val dir: File,
* Read a message set from the log.
* startOffset - The logical offset to begin reading at
* maxLength - The maximum number of bytes to read
* maxOffset - The maximum logical offset to include in the resulting message set
* maxOffset - The first offset not included in the read
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

5
core/src/main/scala/kafka/log/LogSegment.scala

@ -86,7 +86,8 @@ class LogSegment(val messageSet: FileMessageSet, @@ -86,7 +86,8 @@ class LogSegment(val messageSet: FileMessageSet,
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
if(maxSize == 0)
return MessageSet.Empty
val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return MessageSet.Empty
@ -106,7 +107,7 @@ class LogSegment(val messageSet: FileMessageSet, @@ -106,7 +107,7 @@ class LogSegment(val messageSet: FileMessageSet,
val mapping = translateOffset(offset)
val endPosition =
if(mapping == null)
messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)

109
core/src/test/scala/other/kafka/StressTestLog.scala

@ -0,0 +1,109 @@ @@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import java.util.concurrent.atomic._
import kafka.common._
import kafka.message._
import kafka.log._
import kafka.utils._
/**
* A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it
* from another thread and checks a few basic assertions until the user kills the process.
*/
object StressTestLog {
val running = new AtomicBoolean(true)
def main(args: Array[String]) {
val dir = TestUtils.tempDir()
val log = new Log(dir,
maxLogFileSize = 64*1024*1024,
maxMessageSize = Int.MaxValue,
flushInterval = Int.MaxValue,
rollIntervalMs = Long.MaxValue,
needsRecovery = false,
maxIndexSize = 1024*1024,
time = SystemTime,
brokerId = 0)
val writer = new WriterThread(log)
writer.start()
val reader = new ReaderThread(log)
reader.start()
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
running.set(false)
writer.join()
reader.join()
Utils.rm(dir)
}
})
while(running.get) {
println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
Thread.sleep(1000)
}
}
abstract class WorkerThread extends Thread {
override def run() {
try {
var offset = 0
while(running.get)
work()
} catch {
case e: Exception =>
e.printStackTrace()
running.set(false)
}
println(getClass.getName + " exiting...")
}
def work()
}
class WriterThread(val log: Log) extends WorkerThread {
@volatile var offset = 0
override def work() {
val offsets = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
require(offsets._1 == offset && offsets._2 == offset)
offset += 1
if(offset % 1000 == 0)
Thread.sleep(500)
}
}
class ReaderThread(val log: Log) extends WorkerThread {
@volatile var offset = 0
override def work() {
try {
log.read(offset, 1024, Some(offset+1)) match {
case read: FileMessageSet if read.sizeInBytes > 0 => {
val first = read.head
require(first.offset == offset, "We should either read nothing or the message we asked for.")
require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes))
offset += 1
}
case _ =>
}
} catch {
case e: OffsetOutOfRangeException => // this is okay
}
}
}
}
Loading…
Cancel
Save