Browse Source

KAFKA-8542; Cache transaction first offset metadata on follower (#6943)

Followers should cache the log offset metadata for the start offset of each transaction in order to be able to compute the last stable offset without an offset index lookup. This is needed for follower fetching in KIP-392.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/6985/merge
Jason Gustafson 5 years ago committed by GitHub
parent
commit
53b837f9c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      core/src/main/scala/kafka/log/Log.scala
  2. 2
      core/src/main/scala/kafka/log/LogSegment.scala
  3. 27
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  4. 52
      core/src/test/scala/unit/kafka/log/LogTest.scala
  5. 52
      core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

56
core/src/main/scala/kafka/log/Log.scala

@ -782,7 +782,10 @@ class Log(@volatile var dir: File, @@ -782,7 +782,10 @@ class Log(@volatile var dir: File,
val completedTxns = ListBuffer.empty[CompletedTxn]
records.batches.asScala.foreach { batch =>
if (batch.hasProducerId) {
val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false)
val maybeCompletedTxn = updateProducers(batch,
loadedProducers,
firstOffsetMetadata = None,
isFromClient = false)
maybeCompletedTxn.foreach(completedTxns += _)
}
}
@ -1013,9 +1016,19 @@ class Log(@volatile var dir: File, @@ -1013,9 +1016,19 @@ class Log(@volatile var dir: File,
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = LogOffsetMetadata(
messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
segmentBaseOffset = segment.baseOffset,
relativePositionInSegment = segment.size)
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, isFromClient)
maybeDuplicate.foreach { duplicate =>
appendInfo.firstOffset = Some(duplicate.firstOffset)
appendInfo.lastOffset = duplicate.lastOffset
@ -1024,14 +1037,6 @@ class Log(@volatile var dir: File, @@ -1024,14 +1037,6 @@ class Log(@volatile var dir: File,
return appendInfo
}
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = LogOffsetMetadata(
messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
segmentBaseOffset = segment.baseOffset,
relativePositionInSegment = segment.size)
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
@ -1046,8 +1051,7 @@ class Log(@volatile var dir: File, @@ -1046,8 +1051,7 @@ class Log(@volatile var dir: File,
updateLogEndOffset(appendInfo.lastOffset + 1)
// update the producer state
for ((_, producerAppendInfo) <- updatedProducers) {
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
for (producerAppendInfo <- updatedProducers.values) {
producerStateManager.update(producerAppendInfo)
}
@ -1139,11 +1143,16 @@ class Log(@volatile var dir: File, @@ -1139,11 +1143,16 @@ class Log(@volatile var dir: File,
}
}
private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
isFromClient: Boolean):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
for (batch <- records.batches.asScala if batch.hasProducerId) {
var relativePositionInSegment = appendOffsetMetadata.relativePositionInSegment
for (batch <- records.batches.asScala) {
if (batch.hasProducerId) {
val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
// if this is a client produce request, there will be up to 5 batches which could have been duplicated.
@ -1154,9 +1163,23 @@ class Log(@volatile var dir: File, @@ -1154,9 +1163,23 @@ class Log(@volatile var dir: File,
}
}
val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
// We cache offset metadata for the start of each transaction. This allows us to
// compute the last stable offset without relying on additional index lookups.
val firstOffsetMetadata = if (batch.isTransactional)
Some(LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
else
None
val maybeCompletedTxn = updateProducers(batch,
updatedProducers,
firstOffsetMetadata = firstOffsetMetadata,
isFromClient = isFromClient)
maybeCompletedTxn.foreach(completedTxns += _)
}
relativePositionInSegment += batch.sizeInBytes
}
(updatedProducers, completedTxns.toList, None)
}
@ -1249,10 +1272,11 @@ class Log(@volatile var dir: File, @@ -1249,10 +1272,11 @@ class Log(@volatile var dir: File,
private def updateProducers(batch: RecordBatch,
producers: mutable.Map[Long, ProducerAppendInfo],
firstOffsetMetadata: Option[LogOffsetMetadata],
isFromClient: Boolean): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient))
appendInfo.append(batch)
appendInfo.append(batch, firstOffsetMetadata)
}
/**

2
core/src/main/scala/kafka/log/LogSegment.scala

@ -245,7 +245,7 @@ class LogSegment private[log] (val log: FileRecords, @@ -245,7 +245,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
val maybeCompletedTxn = appendInfo.append(batch)
val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)

27
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -263,7 +263,8 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, @@ -263,7 +263,8 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
}
def append(batch: RecordBatch): Option[CompletedTxn] = {
def append(batch: RecordBatch,
firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = {
if (batch.isControlBatch) {
val recordIterator = batch.iterator
if (recordIterator.hasNext) {
@ -276,8 +277,9 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, @@ -276,8 +277,9 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
None
}
} else {
append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.baseOffset, batch.lastOffset,
batch.isTransactional)
val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(LogOffsetMetadata(batch.baseOffset))
append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
firstOffsetMetadata, batch.lastOffset, batch.isTransactional)
None
}
}
@ -286,9 +288,10 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, @@ -286,9 +288,10 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
firstSeq: Int,
lastSeq: Int,
lastTimestamp: Long,
firstOffset: Long,
firstOffsetMetadata: LogOffsetMetadata,
lastOffset: Long,
isTransactional: Boolean): Unit = {
val firstOffset = firstOffsetMetadata.messageOffset
maybeValidateAppend(epoch, firstSeq, firstOffset)
updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
@ -296,12 +299,12 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, @@ -296,12 +299,12 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
case Some(_) if !isTransactional =>
// Received a non-transactional message while a transaction is active
throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " +
s"offset $firstOffset in partition $topicPartition")
s"offset $firstOffsetMetadata in partition $topicPartition")
case None if isTransactional =>
// Began a new transaction
updatedEntry.currentTxnFirstOffset = Some(firstOffset)
transactions += new TxnMetadata(producerId, firstOffset)
transactions += TxnMetadata(producerId, firstOffsetMetadata)
case _ => // nothing to do
}
@ -336,18 +339,6 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, @@ -336,18 +339,6 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
def startedTransactions: List[TxnMetadata] = transactions.toList
def maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata: LogOffsetMetadata): Unit = {
// we will cache the log offset metadata if it corresponds to the starting offset of
// the last transaction that was started. This is optimized for leader appends where it
// is only possible to have one transaction started for each log append, and the log
// offset metadata will always match in that case since no data from other producers
// is mixed into the append
transactions.headOption.foreach { txn =>
if (txn.firstOffset.messageOffset == logOffsetMetadata.messageOffset)
txn.firstOffset = logOffsetMetadata
}
}
override def toString: String = {
"ProducerAppendInfo(" +
s"producerId=$producerId, " +

52
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -28,7 +28,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException @@ -28,7 +28,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException
import kafka.log.Log.DeleteDirSuffix
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors._
@ -3308,7 +3308,21 @@ class LogTest { @@ -3308,7 +3308,21 @@ class LogTest {
appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
val abortedTransactions = allAbortedTransactions(log)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
val expectedTransactions = List(
new AbortedTxn(pid1, 0L, 29L, 8L),
new AbortedTxn(pid2, 8L, 74L, 36L)
)
assertEquals(expectedTransactions, abortedTransactions)
// Verify caching of the segment position of the first unstable offset
log.highWatermark = 30L
assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
log.highWatermark = 75L
assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
log.highWatermark = log.logEndOffset
assertEquals(None, log.firstUnstableOffset)
}
@Test
@ -3509,7 +3523,39 @@ class LogTest { @@ -3509,7 +3523,39 @@ class LogTest {
appendAsFollower(log, MemoryRecords.readableRecords(buffer))
val abortedTransactions = allAbortedTransactions(log)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
val expectedTransactions = List(
new AbortedTxn(pid1, 0L, 29L, 8L),
new AbortedTxn(pid2, 8L, 74L, 36L)
)
assertEquals(expectedTransactions, abortedTransactions)
// Verify caching of the segment position of the first unstable offset
log.highWatermark = 30L
assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
log.highWatermark = 75L
assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
log.highWatermark = log.logEndOffset
assertEquals(None, log.firstUnstableOffset)
}
private def assertCachedFirstUnstableOffset(log: Log, expectedOffset: Long): Unit = {
assertTrue(log.firstUnstableOffset.isDefined)
val firstUnstableOffset = log.firstUnstableOffset.get
assertEquals(expectedOffset, firstUnstableOffset.messageOffset)
assertFalse(firstUnstableOffset.messageOffsetOnly)
assertValidLogOffsetMetadata(log, firstUnstableOffset)
}
private def assertValidLogOffsetMetadata(log: Log, offsetMetadata: LogOffsetMetadata): Unit = {
val readInfo = log.read(startOffset = offsetMetadata.messageOffset,
maxLength = 2048,
maxOffset = None,
minOneMessage = false,
includeAbortedTxns = false)
assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata)
}
@Test(expected = classOf[TransactionCoordinatorFencedException])

52
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

@ -137,7 +137,8 @@ class ProducerStateManagerTest { @@ -137,7 +137,8 @@ class ProducerStateManagerTest {
val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = false)
// Sequence number wrap around
appendInfo.append(epoch, Int.MaxValue-10, 9, time.milliseconds(), 2000L, 2020L, isTransactional = false)
appendInfo.append(epoch, Int.MaxValue - 10, 9, time.milliseconds(),
LogOffsetMetadata(2000L), 2020L, isTransactional = false)
assertEquals(None, stateManager.lastEntry(producerId))
stateManager.update(appendInfo)
assertTrue(stateManager.lastEntry(producerId).isDefined)
@ -209,14 +210,14 @@ class ProducerStateManagerTest { @@ -209,14 +210,14 @@ class ProducerStateManagerTest {
val offset = 992342L
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
val logOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
val firstOffsetMetadata = LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
relativePositionInSegment = 234224)
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(),
firstOffsetMetadata, offset, isTransactional = true)
stateManager.update(producerAppendInfo)
assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset)
assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset)
}
@Test
@ -232,10 +233,10 @@ class ProducerStateManagerTest { @@ -232,10 +233,10 @@ class ProducerStateManagerTest {
ProducerStateEntry.empty(producerId),
ValidationType.Full
)
producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true)
val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
val firstOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
relativePositionInSegment = 50 * relativeOffset)
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(),
firstOffsetMetadata, startOffset, isTransactional = true)
stateManager.update(producerAppendInfo)
}
@ -273,36 +274,20 @@ class ProducerStateManagerTest { @@ -273,36 +274,20 @@ class ProducerStateManagerTest {
assertEquals(None, stateManager.firstUnstableOffset)
}
@Test
def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = {
val producerEpoch = 0.toShort
val offset = 992342L
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
// use some other offset to simulate a follower append where the log offset metadata won't typically
// match any of the transaction first offsets
val logOffsetMetadata = LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L,
relativePositionInSegment = 234224)
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
stateManager.update(producerAppendInfo)
assertEquals(Some(LogOffsetMetadata(offset)), stateManager.firstUnstableOffset)
}
@Test
def testPrepareUpdateDoesNotMutate(): Unit = {
val producerEpoch = 0.toShort
val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
appendInfo.append(producerEpoch, 0, 5, time.milliseconds(), 15L, 20L, isTransactional = false)
appendInfo.append(producerEpoch, 0, 5, time.milliseconds(),
LogOffsetMetadata(15L), 20L, isTransactional = false)
assertEquals(None, stateManager.lastEntry(producerId))
stateManager.update(appendInfo)
assertTrue(stateManager.lastEntry(producerId).isDefined)
val nextAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
nextAppendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 26L, 30L, isTransactional = false)
nextAppendInfo.append(producerEpoch, 6, 10, time.milliseconds(),
LogOffsetMetadata(26L), 30L, isTransactional = false)
assertTrue(stateManager.lastEntry(producerId).isDefined)
var lastEntry = stateManager.lastEntry(producerId).get
@ -325,7 +310,8 @@ class ProducerStateManagerTest { @@ -325,7 +310,8 @@ class ProducerStateManagerTest {
append(stateManager, producerId, producerEpoch, 0, offset)
val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 16L, 20L, isTransactional = true)
appendInfo.append(producerEpoch, 1, 5, time.milliseconds(),
LogOffsetMetadata(16L), 20L, isTransactional = true)
var lastEntry = appendInfo.toEntry
assertEquals(producerEpoch, lastEntry.producerEpoch)
assertEquals(1, lastEntry.firstSeq)
@ -335,7 +321,8 @@ class ProducerStateManagerTest { @@ -335,7 +321,8 @@ class ProducerStateManagerTest {
assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 26L, 30L, isTransactional = true)
appendInfo.append(producerEpoch, 6, 10, time.milliseconds(),
LogOffsetMetadata(26L), 30L, isTransactional = true)
lastEntry = appendInfo.toEntry
assertEquals(producerEpoch, lastEntry.producerEpoch)
assertEquals(1, lastEntry.firstSeq)
@ -892,7 +879,8 @@ class ProducerStateManagerTest { @@ -892,7 +879,8 @@ class ProducerStateManagerTest {
isTransactional: Boolean = false,
isFromClient : Boolean = true): Unit = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, offset, isTransactional)
producerAppendInfo.append(producerEpoch, seq, seq, timestamp,
LogOffsetMetadata(offset), offset, isTransactional)
stateManager.update(producerAppendInfo)
stateManager.updateMapEndOffset(offset + 1)
}
@ -904,7 +892,7 @@ class ProducerStateManagerTest { @@ -904,7 +892,7 @@ class ProducerStateManagerTest {
batch: RecordBatch,
isFromClient : Boolean): Unit = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
producerAppendInfo.append(batch)
producerAppendInfo.append(batch, firstOffsetMetadataOpt = None)
stateManager.update(producerAppendInfo)
stateManager.updateMapEndOffset(offset + 1)
}

Loading…
Cancel
Save