Browse Source

KAFKA-10432; LeaderEpochCache is incorrectly recovered for leader epoch 0 (#9219)

The leader epoch cache is incorrectly recovered for epoch 0 as the
assignment is skipped when epoch == 0. This check was likely intended to
prevent negative epochs from being applied or there was an assumption
that epochs started at 1.

A test has been added to LogSegmentTest to show the LogSegment
recovery path works for the epoch cache. This was a test gap as none of the 
recover calls supply a leader epoch cache to recover.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/8832/merge
Lucas Bradstreet 4 years ago committed by GitHub
parent
commit
d2521855b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/src/main/scala/kafka/log/LogSegment.scala
  2. 56
      core/src/test/scala/unit/kafka/log/LogSegmentTest.scala

2
core/src/main/scala/kafka/log/LogSegment.scala

@ -361,7 +361,7 @@ class LogSegment private[log] (val log: FileRecords, @@ -361,7 +361,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)

56
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala

@ -18,6 +18,9 @@ package kafka.log @@ -18,6 +18,9 @@ package kafka.log
import java.io.File
import kafka.server.checkpoints.LeaderEpochCheckpoint
import kafka.server.epoch.EpochEntry
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
@ -28,6 +31,7 @@ import org.junit.{After, Before, Test} @@ -28,6 +31,7 @@ import org.junit.{After, Before, Test}
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
class LogSegmentTest {
@ -316,26 +320,26 @@ class LogSegmentTest { @@ -316,26 +320,26 @@ class LogSegmentTest {
// append transactional records from pid1
segment.append(largestOffset = 101L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 100L, MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
shallowOffsetOfMaxTimestamp = 100L, records = MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append transactional records from pid2
segment.append(largestOffset = 103L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 102L, MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
shallowOffsetOfMaxTimestamp = 102L, records = MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append non-transactional records
segment.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, CompressionType.NONE,
shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE,
partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed)
segment.append(largestOffset = 106L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
shallowOffsetOfMaxTimestamp = 106L, records = endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
// commit the transaction from pid1
segment.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
shallowOffsetOfMaxTimestamp = 107L, records = endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
var stateManager = new ProducerStateManager(topicPartition, logDir)
segment.recover(stateManager)
@ -367,6 +371,48 @@ class LogSegmentTest { @@ -367,6 +371,48 @@ class LogSegmentTest {
assertEquals(100L, abortedTxn.lastStableOffset)
}
/**
* Create a segment with some data, then recover the segment.
* The epoch cache entries should reflect the segment.
*/
@Test
def testRecoveryRebuildsEpochCache(): Unit = {
val seg = createSegment(0)
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]
override def write(epochs: Seq[EpochEntry]): Unit = {
this.epochs = epochs.toVector
}
override def read(): Seq[EpochEntry] = this.epochs
}
val cache = new LeaderEpochFileCache(topicPartition, () => seg.readNextOffset, checkpoint)
seg.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE, 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 106L, records = MemoryRecords.withRecords(106L, CompressionType.NONE, 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(largestOffset = 109L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 108L, records = MemoryRecords.withRecords(108L, CompressionType.NONE, 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(largestOffset = 111L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 110, records = MemoryRecords.withRecords(110L, CompressionType.NONE, 2,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.recover(new ProducerStateManager(topicPartition, logDir), Some(cache))
assertEquals(ArrayBuffer(EpochEntry(epoch = 0, startOffset = 104L),
EpochEntry(epoch = 1, startOffset = 106),
EpochEntry(epoch = 2, startOffset = 110)),
cache.epochEntries)
}
private def endTxnRecords(controlRecordType: ControlRecordType,
producerId: Long,
producerEpoch: Short,

Loading…
Cancel
Save