Browse Source

KAFKA-6857; Leader should reply with undefined offset if undefined leader epoch requested (#4967)

The leader must explicitly check if requested leader epoch is undefined, and return undefined offset so that the follower can fall back to truncating to high watermark. Otherwise, if the leader also is not tracking leader epochs, it may return its LEO, which will the follower to truncate to the incorrect offset.
pull/4969/head
Anna Povzner 7 years ago committed by Jason Gustafson
parent
commit
a5318722c7
  1. 9
      core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
  2. 15
      core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala

9
core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala

@ -96,10 +96,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM @@ -96,10 +96,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
override def endOffsetFor(requestedEpoch: Int): Long = {
inReadLock(lock) {
val offset =
if (requestedEpoch == latestEpoch) {
if (requestedEpoch == UNDEFINED_EPOCH) {
// this may happen if a bootstrapping follower sends a request with undefined epoch or
// a follower is on the older message format where leader epochs are not recorded
UNDEFINED_EPOCH_OFFSET
} else if (requestedEpoch == latestEpoch) {
leo().messageOffset
}
else {
} else {
val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
UNDEFINED_EPOCH_OFFSET

15
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala

@ -143,6 +143,21 @@ class LeaderEpochFileCacheTest { @@ -143,6 +143,21 @@ class LeaderEpochFileCacheTest {
assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0))
}
@Test
def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
val leo = 73
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
//Then
assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
}
@Test
def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
def leoFinder() = new LogOffsetMetadata(0)

Loading…
Cancel
Save