Browse Source

KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)

Reviewers: Ismael Juma <ismael@juma.me.uk> and Jun Rao <junrao@gmail.com>
pull/5769/head
Manikumar Reddy O 6 years ago committed by Jun Rao
parent
commit
0848b78881
  1. 22
      core/src/main/scala/kafka/log/Log.scala
  2. 14
      core/src/main/scala/kafka/log/LogSegment.scala
  3. 28
      core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
  4. 2
      core/src/test/scala/unit/kafka/log/LogTest.scala
  5. 3
      core/src/test/scala/unit/kafka/log/LogUtils.scala
  6. 27
      core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala

22
core/src/main/scala/kafka/log/Log.scala

@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i @@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
}
}
/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
case class RollParams(maxSegmentMs: Long,
maxSegmentBytes: Int,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)
object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
new RollParams(config.segmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
messagesSize, now)
}
}
/**
* An append-only log for storing messages.
*
@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File, @@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
val maxTimestampInMessages = appendInfo.maxTimestamp
val maxOffsetInMessages = appendInfo.lastOffset
if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " +
s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +

14
core/src/main/scala/kafka/log/LogSegment.scala

@ -45,8 +45,10 @@ import scala.math._ @@ -45,8 +45,10 @@ import scala.math._
* @param log The file records containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
* @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
*/
@nonthreadsafe
@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords, @@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val maxSegmentMs: Long,
val maxSegmentBytes: Int,
val time: Time) extends Logging {
def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
size > maxSegmentBytes - messagesSize ||
def shouldRoll(rollParams: RollParams): Boolean = {
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}
def resizeIndexes(size: Int): Unit = {
@ -637,8 +637,6 @@ object LogSegment { @@ -637,8 +637,6 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
maxSegmentMs = config.segmentMs,
maxSegmentBytes = config.segmentSize,
time)
}

28
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala

@ -38,9 +38,8 @@ class LogSegmentTest { @@ -38,9 +38,8 @@ class LogSegmentTest {
/* create a segment with the given base offset */
def createSegment(offset: Long,
indexIntervalBytes: Int = 10,
maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time)
segments += seg
seg
}
@ -163,10 +162,10 @@ class LogSegmentTest { @@ -163,10 +162,10 @@ class LogSegmentTest {
val maxSegmentMs = 300000
val time = new MockTime
val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
val seg = createSegment(0, time = time)
seg.close()
val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
val reopened = createSegment(0, time = time)
assertEquals(0, seg.timeIndex.sizeInBytes)
assertEquals(0, seg.offsetIndex.sizeInBytes)
@ -176,24 +175,21 @@ class LogSegmentTest { @@ -176,24 +175,21 @@ class LogSegmentTest {
assertFalse(reopened.timeIndex.isFull)
assertFalse(reopened.offsetIndex.isFull)
assertFalse(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L,
now = time.milliseconds()))
var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
assertFalse(reopened.shouldRoll(rollParams))
// The segment should not be rolled even if maxSegmentMs has been exceeded
time.sleep(maxSegmentMs + 1)
assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
assertFalse(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L,
now = time.milliseconds()))
rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
assertFalse(reopened.shouldRoll(rollParams))
// But we should still roll the segment if we cannot fit the next offset
assertTrue(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = Int.MaxValue.toLong + 200,
now = time.milliseconds()))
rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024, time.milliseconds())
assertTrue(reopened.shouldRoll(rollParams))
}
@Test

2
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -277,7 +277,7 @@ class LogTest { @@ -277,7 +277,7 @@ class LogTest {
override def addSegment(segment: LogSegment): LogSegment = {
val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset,
segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
minOneMessage: Boolean): FetchDataInfo = {

3
core/src/test/scala/unit/kafka/log/LogUtils.scala

@ -29,13 +29,12 @@ object LogUtils { @@ -29,13 +29,12 @@ object LogUtils {
def createSegment(offset: Long,
logDir: File,
indexIntervalBytes: Int = 10,
maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
}
}

27
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala

@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@Test
def testDynamicTopicConfigChange() {
val tp = new TopicPartition("test", 0)
val oldSegmentSize = 1000
val logProps = new Properties()
logProps.put(SegmentBytesProp, oldSegmentSize.toString)
createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
}
val log = servers.head.logManager.getLog(tp).get
val newSegmentSize = 2000
logProps.put(SegmentBytesProp, newSegmentSize.toString)
adminZkClient.changeTopicConfig(tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newSegmentSize, log.config.segmentSize)
}
(1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString))
// Verify that the new config is used for all segments
assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000))
}
private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String) {
assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))

Loading…
Cancel
Save