From 311a5d81d8a8b34e18e1ad2fdc491661477de83c Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Wed, 23 Jan 2013 14:07:19 -0800 Subject: [PATCH] KAFKA-727 broker can still expose uncommitted data to a consumer; reviewed by Neha Narkhede --- core/src/main/scala/kafka/log/Log.scala | 2 +- .../src/main/scala/kafka/log/LogSegment.scala | 5 +- .../scala/other/kafka/StressTestLog.scala | 109 ++++++++++++++++++ 3 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/other/kafka/StressTestLog.scala diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 560be19721d..5cd36e05145 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -369,7 +369,7 @@ private[kafka] class Log(val dir: File, * Read a message set from the log. * startOffset - The logical offset to begin reading at * maxLength - The maximum number of bytes to read - * maxOffset - The maximum logical offset to include in the resulting message set + * maxOffset - The first offset not included in the read */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4417cffd05d..2e40629ff30 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -86,7 +86,8 @@ class LogSegment(val messageSet: FileMessageSet, throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) if(maxSize == 0) return MessageSet.Empty - + + val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy val startPosition = translateOffset(startOffset) // if the start position is already off the end of the log, return MessageSet.Empty @@ -106,7 +107,7 @@ class LogSegment(val messageSet: FileMessageSet, val mapping = translateOffset(offset) val endPosition = if(mapping == null) - messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file + logSize // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala new file mode 100644 index 00000000000..78e054877bd --- /dev/null +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka + +import java.util.concurrent.atomic._ +import kafka.common._ +import kafka.message._ +import kafka.log._ +import kafka.utils._ + +/** + * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it + * from another thread and checks a few basic assertions until the user kills the process. + */ +object StressTestLog { + val running = new AtomicBoolean(true) + + def main(args: Array[String]) { + val dir = TestUtils.tempDir() + val log = new Log(dir, + maxLogFileSize = 64*1024*1024, + maxMessageSize = Int.MaxValue, + flushInterval = Int.MaxValue, + rollIntervalMs = Long.MaxValue, + needsRecovery = false, + maxIndexSize = 1024*1024, + time = SystemTime, + brokerId = 0) + val writer = new WriterThread(log) + writer.start() + val reader = new ReaderThread(log) + reader.start() + + Runtime.getRuntime().addShutdownHook(new Thread() { + override def run() = { + running.set(false) + writer.join() + reader.join() + Utils.rm(dir) + } + }) + + while(running.get) { + println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset)) + Thread.sleep(1000) + } + } + + abstract class WorkerThread extends Thread { + override def run() { + try { + var offset = 0 + while(running.get) + work() + } catch { + case e: Exception => + e.printStackTrace() + running.set(false) + } + println(getClass.getName + " exiting...") + } + def work() + } + + class WriterThread(val log: Log) extends WorkerThread { + @volatile var offset = 0 + override def work() { + val offsets = log.append(TestUtils.singleMessageSet(offset.toString.getBytes)) + require(offsets._1 == offset && offsets._2 == offset) + offset += 1 + if(offset % 1000 == 0) + Thread.sleep(500) + } + } + + class ReaderThread(val log: Log) extends WorkerThread { + @volatile var offset = 0 + override def work() { + try { + log.read(offset, 1024, Some(offset+1)) match { + case read: FileMessageSet if read.sizeInBytes > 0 => { + val first = read.head + require(first.offset == offset, "We should either read nothing or the message we asked for.") + require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes)) + offset += 1 + } + case _ => + } + } catch { + case e: OffsetOutOfRangeException => // this is okay + } + } + } +} \ No newline at end of file