Browse Source

KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)

This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections.

Additionally, we have made the following changes:

1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache.
2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader.

Reviewers: Jun Rao <junrao@gmail.com>
pull/5694/merge
Jason Gustafson 6 years ago committed by GitHub
parent
commit
f2dd6aa269
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 4
      core/src/main/scala/kafka/cluster/Replica.scala
  3. 24
      core/src/main/scala/kafka/log/Log.scala
  4. 4
      core/src/main/scala/kafka/log/LogSegment.scala
  5. 181
      core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
  6. 33
      core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
  7. 41
      core/src/test/scala/unit/kafka/log/LogTest.scala
  8. 4
      core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
  9. 28
      core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
  10. 22
      core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
  11. 6
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  12. 1
      core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
  13. 24
      core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
  14. 463
      core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
  15. 49
      core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
  16. 2
      core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
  17. 5
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

11
core/src/main/scala/kafka/cluster/Partition.scala

@ -301,8 +301,17 @@ class Partition(val topic: String, @@ -301,8 +301,17 @@ class Partition(val topic: String,
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionStateInfo.basePartitionState.zkVersion
val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)
// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
// leader epoch and the start offset since it should be larger than any epoch that a follower
// would try to query.
leaderReplica.epochs.foreach { epochCache =>
epochCache.assign(leaderEpoch, leaderEpochStartOffset)
}
val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.

4
core/src/main/scala/kafka/cluster/Replica.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.cluster
import kafka.log.Log
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import org.apache.kafka.common.{KafkaException, TopicPartition}
@ -55,7 +55,7 @@ class Replica(val brokerId: Int, @@ -55,7 +55,7 @@ class Replica(val brokerId: Int,
def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))

24
core/src/main/scala/kafka/log/Log.scala

@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd @@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@ -229,7 +229,7 @@ class Log(@volatile var dir: File, @@ -229,7 +229,7 @@ class Log(@volatile var dir: File,
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
@volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache()
locally {
val startMs = time.milliseconds
@ -239,12 +239,12 @@ class Log(@volatile var dir: File, @@ -239,12 +239,12 @@ class Log(@volatile var dir: File,
/* Calculate the offset of the next message */
nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
_leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
_leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)
logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
_leaderEpochCache.truncateFromStart(logStartOffset)
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
@ -296,11 +296,11 @@ class Log(@volatile var dir: File, @@ -296,11 +296,11 @@ class Log(@volatile var dir: File,
def leaderEpochCache = _leaderEpochCache
private def initializeLeaderEpochCache(): LeaderEpochCache = {
private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
}
/**
@ -422,7 +422,7 @@ class Log(@volatile var dir: File, @@ -422,7 +422,7 @@ class Log(@volatile var dir: File,
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment,
leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)
val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
@ -941,7 +941,7 @@ class Log(@volatile var dir: File, @@ -941,7 +941,7 @@ class Log(@volatile var dir: File,
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset to $newLogStartOffset")
logStartOffset = newLogStartOffset
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
_leaderEpochCache.truncateFromStart(logStartOffset)
producerStateManager.truncateHead(logStartOffset)
updateFirstUnstableOffset()
}
@ -1650,7 +1650,7 @@ class Log(@volatile var dir: File, @@ -1650,7 +1650,7 @@ class Log(@volatile var dir: File,
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
this.logStartOffset = math.min(targetOffset, this.logStartOffset)
_leaderEpochCache.clearAndFlushLatest(targetOffset)
_leaderEpochCache.truncateFromEnd(targetOffset)
loadProducerState(targetOffset, reloadFromCleanShutdown = false)
}
true

4
core/src/main/scala/kafka/log/LogSegment.scala

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
@ -330,7 +330,7 @@ class LogSegment private[log] (val log: FileRecords, @@ -330,7 +330,7 @@ class LogSegment private[log] (val log: FileRecords,
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()

181
core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala

@ -18,23 +18,13 @@ package kafka.server.epoch @@ -18,23 +18,13 @@ package kafka.server.epoch
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.LogOffsetMetadata
import kafka.server.checkpoints.LeaderEpochCheckpoint
import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable.ListBuffer
trait LeaderEpochCache {
def assign(leaderEpoch: Int, offset: Long)
def latestEpoch: Int
def endOffsetFor(epoch: Int): (Int, Long)
def clearAndFlushLatest(offset: Long)
def clearAndFlushEarliest(offset: Long)
def clearAndFlush()
def clear()
}
import scala.collection.mutable.ListBuffer
/**
* Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
@ -42,39 +32,65 @@ trait LeaderEpochCache { @@ -42,39 +32,65 @@ trait LeaderEpochCache {
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
*
* @param leo a function that determines the log end offset
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
* @param logEndOffset function to fetch the current log end offset
*/
class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
class LeaderEpochFileCache(topicPartition: TopicPartition,
logEndOffset: () => Long,
checkpoint: LeaderEpochCheckpoint) extends Logging {
this.logIdent = s"[LeaderEpochCache $topicPartition] "
private val lock = new ReentrantReadWriteLock()
private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
/**
* Assigns the supplied Leader Epoch to the supplied Offset
* Once the epoch is assigned it cannot be reassigned
*
* @param epoch
* @param offset
*/
override def assign(epoch: Int, offset: Long): Unit = {
def assign(epoch: Int, startOffset: Long): Unit = {
inWriteLock(lock) {
if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) {
info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
epochs += EpochEntry(epoch, offset)
flush()
val updateNeeded = if (epochs.isEmpty) {
true
} else {
validateAndMaybeWarn(epoch, offset)
val lastEntry = epochs.last
lastEntry.epoch != epoch || startOffset < lastEntry.startOffset
}
if (updateNeeded) {
truncateAndAppend(EpochEntry(epoch, startOffset))
flush()
}
}
}
/**
* Remove any entries which violate monotonicity following the insertion of an assigned epoch.
*/
private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
validateAndMaybeWarn(entryToAppend)
val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
}
epochs = retainedEpochs :+ entryToAppend
if (removedEpochs.isEmpty) {
debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
} else {
warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}
}
/**
* Returns the current Leader Epoch. This is the latest epoch
* which has messages assigned to it.
*
* @return
*/
override def latestEpoch(): Int = {
def latestEpoch: Int = {
inReadLock(lock) {
if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
}
@ -93,45 +109,59 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM @@ -93,45 +109,59 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
* so that the follower falls back to High Water Mark.
*
* @param requestedEpoch requested leader epoch
* @return leader epoch and offset
* @return found leader epoch and end offset
*/
override def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
inReadLock(lock) {
val epochAndOffset =
if (requestedEpoch == UNDEFINED_EPOCH) {
// this may happen if a bootstrapping follower sends a request with undefined epoch or
// This may happen if a bootstrapping follower sends a request with undefined epoch or
// a follower is on the older message format where leader epochs are not recorded
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else if (requestedEpoch == latestEpoch) {
(requestedEpoch, leo().messageOffset)
// For the leader, the latest epoch is always the current leader epoch that is still being written to.
// Followers should not have any reason to query for the end offset of the current epoch, but a consumer
// might if it is verifying its committed offset following a group rebalance. In this case, we return
// the current log end offset which makes the truncation check work as expected.
(requestedEpoch, logEndOffset())
} else {
val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch}
if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
// no epochs recorded or requested epoch < the first epoch cached
if (subsequentEpochs.isEmpty) {
// The requested epoch is larger than any known epoch. This case should never be hit because
// the latest cached epoch is always the largest.
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
else {
// we must get at least one element in previous epochs list, because if we are here,
// it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is
} else if (previousEpochs.isEmpty) {
// The requested epoch is smaller than any known epoch, so we return the start offset of the first
// known epoch which is larger than it. This may be inaccurate as there could have been
// epochs in between, but the point is that the data has already been removed from the log
// and we want to ensure that the follower can replicate correctly beginning from the leader's
// start offset.
(requestedEpoch, subsequentEpochs.head.startOffset)
} else {
// We have at least one previous epoch and one subsequent epoch. The result is the first
// prior epoch and the starting offset of the first subsequent epoch.
(previousEpochs.last.epoch, subsequentEpochs.head.startOffset)
}
}
debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning epoch ${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size ${epochs.size}")
debug(s"Processed end offset request for epoch $requestedEpoch and returning epoch ${epochAndOffset._1} " +
s"with end offset ${epochAndOffset._2} from epoch cache of size ${epochs.size}")
epochAndOffset
}
}
/**
* Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
*
* @param offset
*/
override def clearAndFlushLatest(offset: Long): Unit = {
def truncateFromEnd(endOffset: Long): Unit = {
inWriteLock(lock) {
val before = epochs
if (offset >= 0 && offset <= latestOffset()) {
epochs = epochs.filter(entry => entry.startOffset < offset)
if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) {
val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset)
epochs = previousEntries
flush()
info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
debug(s"Cleared entries $subsequentEntries from epoch cache after " +
s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.")
}
}
}
@ -142,20 +172,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM @@ -142,20 +172,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
*
* This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
*
* @param offset the offset to clear up to
* @param startOffset the offset to clear up to
*/
override def clearAndFlushEarliest(offset: Long): Unit = {
def truncateFromStart(startOffset: Long): Unit = {
inWriteLock(lock) {
val before = epochs
if (offset >= 0 && earliestOffset() < offset) {
val earliest = epochs.filter(entry => entry.startOffset < offset)
if (earliest.nonEmpty) {
epochs = epochs --= earliest
//If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset
if (offset < earliestOffset() || epochs.isEmpty)
new EpochEntry(earliest.last.epoch, offset) +=: epochs
if (epochs.nonEmpty) {
val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset)
previousEntries.lastOption.foreach { firstBeforeStartOffset =>
val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset)
epochs = updatedFirstEntry +: subsequentEntries
flush()
info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " +
s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.")
}
}
}
@ -164,47 +195,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM @@ -164,47 +195,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
/**
* Delete all entries.
*/
override def clearAndFlush() = {
def clearAndFlush() = {
inWriteLock(lock) {
epochs.clear()
flush()
}
}
override def clear() = {
def clear() = {
inWriteLock(lock) {
epochs.clear()
}
}
def epochEntries(): ListBuffer[EpochEntry] = {
// Visible for testing
def epochEntries: ListBuffer[EpochEntry] = {
epochs
}
private def earliestOffset(): Long = {
if (epochs.isEmpty) -1 else epochs.head.startOffset
}
private def latestOffset(): Long = {
if (epochs.isEmpty) -1 else epochs.last.startOffset
}
private def latestEntry: Option[EpochEntry] = epochs.lastOption
private def flush(): Unit = {
checkpoint.write(epochs)
}
def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset:$latestOffset} for Partition: $topicPartition"
private def validateAndMaybeWarn(entry: EpochEntry) = {
if (entry.epoch < 0) {
throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
} else {
// If the latest append violates the monotonicity of epochs or starting offsets, our choices
// are either to raise an error, ignore the append, or allow the append and truncate the
// conflicting entries from the cache. Raising an error risks killing the fetcher threads in
// pathological cases (i.e. cases we are not yet aware of). We instead take the final approach
// and assume that the latest append is always accurate.
def validateAndMaybeWarn(epoch: Int, offset: Long) = {
assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
if (epoch < latestEpoch())
warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
else if (offset < latestOffset())
warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
latestEntry.foreach { latest =>
if (entry.epoch < latest.epoch)
warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " +
s"of the latest entry $latest. This implies messages have arrived out of order.")
else if (entry.startOffset < latest.startOffset)
warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " +
s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.")
}
}
}
}
// Mapping of epoch to the first offset of the subsequent epoch
case class EpochEntry(epoch: Int, startOffset: Long)
case class EpochEntry(epoch: Int, startOffset: Long) {
override def toString: String = {
s"EpochEntry(epoch=$epoch, startOffset=$startOffset)"
}
}

33
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

@ -97,6 +97,39 @@ class PartitionTest { @@ -97,6 +97,39 @@ class PartitionTest {
replicaManager.shutdown(checkpointHW = false)
}
@Test
def testMakeLeaderUpdatesEpochCache(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val follower = brokerId + 1
val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower).asJava
val isr = List[Integer](leader, follower).asJava
val leaderEpoch = 8
val log = logManager.getOrCreateLog(topicPartition, logConfig)
log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
), leaderEpoch = 0)
log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5,
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes)
), leaderEpoch = 5)
assertEquals(4, log.logEndOffset)
val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
assertTrue("Expected makeLeader to succeed",
partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch,
isr, 1, replicas, true), 0))
assertEquals(Some(4), partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
val epochEndOffset = partition.lastOffsetForLeaderEpoch(leaderEpoch)
assertEquals(4, epochEndOffset.endOffset)
assertEquals(leaderEpoch, epochEndOffset.leaderEpoch)
}
@Test
// Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
def testMaybeReplaceCurrentWithFutureReplica(): Unit = {

41
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -25,7 +25,7 @@ import java.util.Properties @@ -25,7 +25,7 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.Log.DeleteDirSuffix
import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
@ -287,7 +287,7 @@ class LogTest { @@ -287,7 +287,7 @@ class LogTest {
}
override def recover(producerStateManager: ProducerStateManager,
leaderEpochCache: Option[LeaderEpochCache]): Int = {
leaderEpochCache: Option[LeaderEpochFileCache]): Int = {
recoveredSegments += this
super.recover(producerStateManager, leaderEpochCache)
}
@ -2589,8 +2589,8 @@ class LogTest { @@ -2589,8 +2589,8 @@ class LogTest {
log.onHighWatermarkIncremented(log.logEndOffset)
log.deleteOldSegments()
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries().head)
assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head)
// append some messages to create some segments
for (_ <- 0 until 100)
@ -2599,7 +2599,7 @@ class LogTest { @@ -2599,7 +2599,7 @@ class LogTest {
log.delete()
assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
}
@Test
@ -2612,12 +2612,12 @@ class LogTest { @@ -2612,12 +2612,12 @@ class LogTest {
log.appendAsLeader(createRecords, leaderEpoch = 0)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
log.close()
log.delete()
assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
}
@Test
@ -2816,7 +2816,7 @@ class LogTest { @@ -2816,7 +2816,7 @@ class LogTest {
for (i <- records.indices)
log.appendAsFollower(recordsForEpoch(i))
assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch())
assertEquals(42, log.leaderEpochCache.latestEpoch)
}
@Test
@ -2871,19 +2871,24 @@ class LogTest { @@ -2871,19 +2871,24 @@ class LogTest {
@Test
def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
baseOffset = startOffset, partitionLeaderEpoch = epoch)
}
val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes)
val log = createLog(logDir, logConfig)
val cache = epochCache(log)
//Given 2 segments, 10 messages per segment
for (epoch <- 1 to 20)
log.appendAsLeader(createRecords, leaderEpoch = 0)
def append(epoch: Int, startOffset: Long, count: Int): Unit = {
for (i <- 0 until count)
log.appendAsFollower(createRecords(startOffset + i, epoch))
}
//Simulate some leader changes at specific offsets
cache.assign(0, 0)
cache.assign(1, 10)
cache.assign(2, 16)
//Given 2 segments, 10 messages per segment
append(epoch = 0, startOffset = 0, count = 10)
append(epoch = 1, startOffset = 10, count = 6)
append(epoch = 2, startOffset = 16, count = 4)
assertEquals(2, log.numberOfSegments)
assertEquals(20, log.logEndOffset)
@ -2935,7 +2940,7 @@ class LogTest { @@ -2935,7 +2940,7 @@ class LogTest {
assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
// deliberately remove some of the epoch entries
leaderEpochCache.clearAndFlushLatest(2)
leaderEpochCache.truncateFromEnd(2)
assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
log.close()

4
core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala

@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.{Partition, Replica}
import kafka.log.Log
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
@ -253,7 +253,7 @@ class IsrExpirationTest { @@ -253,7 +253,7 @@ class IsrExpirationTest {
private def logMock: Log = {
val log = EasyMock.createMock(classOf[kafka.log.Log])
val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
EasyMock.expect(log.onHighWatermarkIncremented(0L))

28
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala

@ -21,7 +21,7 @@ import kafka.api.Request @@ -21,7 +21,7 @@ import kafka.api.Request
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.log.LogManager
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{KafkaStorageException, ReplicaNotAvailableException}
@ -46,7 +46,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -46,7 +46,7 @@ class ReplicaAlterLogDirsThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val replica = createNiceMock(classOf[Replica])
val futureReplica = createNiceMock(classOf[Replica])
val partition = createMock(classOf[Partition])
@ -87,7 +87,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -87,7 +87,7 @@ class ReplicaAlterLogDirsThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val replica = createNiceMock(classOf[Replica])
val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager])
@ -133,9 +133,9 @@ class ReplicaAlterLogDirsThreadTest { @@ -133,9 +133,9 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochsT1p0 = createMock(classOf[LeaderEpochCache])
val leaderEpochsT1p1 = createMock(classOf[LeaderEpochCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
val leaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaT1p0 = createNiceMock(classOf[Replica])
val replicaT1p1 = createNiceMock(classOf[Replica])
@ -195,8 +195,8 @@ class ReplicaAlterLogDirsThreadTest { @@ -195,8 +195,8 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createMock(classOf[LeaderEpochCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replica = createNiceMock(classOf[Replica])
// one future replica mock because our mocking methods return same values for both future replicas
@ -265,8 +265,8 @@ class ReplicaAlterLogDirsThreadTest { @@ -265,8 +265,8 @@ class ReplicaAlterLogDirsThreadTest {
val logManager = createMock(classOf[LogManager])
val replica = createNiceMock(classOf[Replica])
val futureReplica = createNiceMock(classOf[Replica])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
@ -319,8 +319,8 @@ class ReplicaAlterLogDirsThreadTest { @@ -319,8 +319,8 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[kafka.log.LogManager])
val replica = createNiceMock(classOf[Replica])
val futureReplica = createNiceMock(classOf[Replica])
@ -401,8 +401,8 @@ class ReplicaAlterLogDirsThreadTest { @@ -401,8 +401,8 @@ class ReplicaAlterLogDirsThreadTest {
//Setup all dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replica = createNiceMock(classOf[Replica])
val futureReplica = createNiceMock(classOf[Replica])

22
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala

@ -20,7 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Replica} @@ -20,7 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Replica}
import kafka.log.LogManager
import kafka.cluster.Partition
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils
import org.apache.kafka.clients.ClientResponse
@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest { @@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest {
//Setup all dependencies
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -278,7 +278,7 @@ class ReplicaFetcherThreadTest { @@ -278,7 +278,7 @@ class ReplicaFetcherThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -339,7 +339,7 @@ class ReplicaFetcherThreadTest { @@ -339,7 +339,7 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -388,7 +388,7 @@ class ReplicaFetcherThreadTest { @@ -388,7 +388,7 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createMock(classOf[LeaderEpochCache])
val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -442,7 +442,7 @@ class ReplicaFetcherThreadTest { @@ -442,7 +442,7 @@ class ReplicaFetcherThreadTest {
// Setup all dependencies
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -513,7 +513,7 @@ class ReplicaFetcherThreadTest { @@ -513,7 +513,7 @@ class ReplicaFetcherThreadTest {
// Setup all dependencies
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -574,7 +574,7 @@ class ReplicaFetcherThreadTest { @@ -574,7 +574,7 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -619,7 +619,7 @@ class ReplicaFetcherThreadTest { @@ -619,7 +619,7 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createMock(classOf[kafka.log.LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -677,7 +677,7 @@ class ReplicaFetcherThreadTest { @@ -677,7 +677,7 @@ class ReplicaFetcherThreadTest {
//Setup all stubs
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createNiceMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
@ -728,7 +728,7 @@ class ReplicaFetcherThreadTest { @@ -728,7 +728,7 @@ class ReplicaFetcherThreadTest {
//Setup all stubs
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
val logManager = createNiceMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])

6
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} @@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
import kafka.cluster.BrokerEndPoint
import kafka.server.epoch.LeaderEpochCache
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
@ -641,7 +641,7 @@ class ReplicaManagerTest { @@ -641,7 +641,7 @@ class ReplicaManagerTest {
val mockScheduler = new MockScheduler(time)
val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
.andReturn((leaderEpochFromLeader, localLogOffset))
@ -661,7 +661,7 @@ class ReplicaManagerTest { @@ -661,7 +661,7 @@ class ReplicaManagerTest {
new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 30000),
logDirFailureChannel = mockLogDirFailureChannel) {
override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache
override def leaderEpochCache: LeaderEpochFileCache = mockLeaderEpochCache
override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
}

1
core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala

@ -24,7 +24,6 @@ import org.junit.Assert._ @@ -24,7 +24,6 @@ import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{
@Test

24
core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala

@ -90,23 +90,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness @@ -90,23 +90,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
assertEquals(0, latestRecord(follower).partitionLeaderEpoch())
//Both leader and follower should have recorded Epoch 0 at Offset 0
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
//Bounce the follower
bounce(follower)
awaitISR(tp)
//Nothing happens yet as we haven't sent any new messages.
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
//Send a message
producer.send(new ProducerRecord(topic, 0, null, msg)).get
//Epoch1 should now propagate to the follower with the written message
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
//The new message should have epoch 1 stamped
assertEquals(1, latestRecord(leader).partitionLeaderEpoch())
@ -117,8 +117,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness @@ -117,8 +117,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
awaitISR(tp)
//Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication.
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
//Send a message
producer.send(new ProducerRecord(topic, 0, null, msg)).get
@ -128,8 +128,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness @@ -128,8 +128,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
assertEquals(2, latestRecord(follower).partitionLeaderEpoch())
//The leader epoch files should now match on leader and follower
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries())
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries)
}
@Test
@ -377,8 +377,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness @@ -377,8 +377,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
info(s"Bounce complete for follower ${follower.config.brokerId}")
info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries())
info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries)
info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries)
}
private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = {

463
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package kafka.server.epoch
import java.io.File
import kafka.server.LogOffsetMetadata
@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN @@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.{Before, Test}
import org.junit.Test
import scala.collection.mutable.ListBuffer
@ -33,54 +34,44 @@ import scala.collection.mutable.ListBuffer @@ -33,54 +34,44 @@ import scala.collection.mutable.ListBuffer
*/
class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
var checkpoint: LeaderEpochCheckpoint = _
private var logEndOffset = 0L
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
override def read(): Seq[EpochEntry] = this.epochs
}
private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
@Test
def shouldAddEpochAndMessageOffsetToCache() = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When
cache.assign(epoch = 2, offset = 10)
leo = 11
cache.assign(epoch = 2, startOffset = 10)
logEndOffset = 11
//Then
assertEquals(2, cache.latestEpoch())
assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo
assertEquals(2, cache.latestEpoch)
assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //should match logEndOffset
}
@Test
def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When just one epoch
cache.assign(epoch = 2, offset = 11)
cache.assign(epoch = 2, offset = 12)
leo = 14
cache.assign(epoch = 2, startOffset = 11)
cache.assign(epoch = 2, startOffset = 12)
logEndOffset = 14
//Then
assertEquals((2, leo), cache.endOffsetFor(2))
assertEquals((2, logEndOffset), cache.endOffsetFor(2))
}
@Test
def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
def leoFinder() = new LogOffsetMetadata(0)
val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
//Given cache with some data on leader
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
// assign couple of epochs
cache.assign(epoch = 2, offset = 11)
cache.assign(epoch = 3, offset = 12)
cache.assign(epoch = 2, startOffset = 11)
cache.assign(epoch = 3, startOffset = 12)
//When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@ -92,68 +83,51 @@ class LeaderEpochFileCacheTest { @@ -92,68 +83,51 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
//Given
leo = 9
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
logEndOffset = 9
cache.assign(2, leo)
cache.assign(2, logEndOffset)
//When called again later
cache.assign(2, 10)
//Then the offset should NOT have been updated
assertEquals(leo, cache.epochEntries()(0).startOffset)
assertEquals(logEndOffset, cache.epochEntries(0).startOffset)
assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries)
}
@Test
def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = {
def leoFinder() = new LogOffsetMetadata(0)
def shouldEnforceMonotonicallyIncreasingStartOffsets() = {
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(2, 9)
//When update epoch new epoch but same offset
cache.assign(3, 9)
//Then epoch should have been updated
assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries())
assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries)
}
@Test
def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = {
//Given
val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint)
cache.assign(2, 6)
//When called again later with a greater offset
cache.assign(2, 10)
//Then later update should have been ignored
assertEquals(6, cache.epochEntries()(0).startOffset)
assertEquals(6, cache.epochEntries(0).startOffset)
}
@Test
def shouldReturnUnsupportedIfNoEpochRecorded(){
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0))
}
@Test
def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
val leo = 73
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
logEndOffset = 73
//When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@ -164,39 +138,41 @@ class LeaderEpochFileCacheTest { @@ -164,39 +138,41 @@ class LeaderEpochFileCacheTest {
}
@Test
def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 5, offset = 11)
cache.assign(epoch = 6, offset = 12)
cache.assign(epoch = 7, offset = 13)
def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(){
cache.assign(epoch = 5, startOffset = 11)
cache.assign(epoch = 6, startOffset = 12)
cache.assign(epoch = 7, startOffset = 13)
//When
val epochAndOffset = cache.endOffsetFor(5 - 1)
val epochAndOffset = cache.endOffsetFor(4)
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset)
assertEquals((4, 11), epochAndOffset)
}
@Test
def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = {
cache.assign(epoch = 5, startOffset = 11)
cache.assign(epoch = 6, startOffset = 12)
cache.assign(epoch = 7, startOffset = 13)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
// epoch 7 starts at an earlier offset
cache.assign(epoch = 7, startOffset = 12)
assertEquals((5, 12), cache.endOffsetFor(5))
assertEquals((5, 12), cache.endOffsetFor(6))
}
@Test
def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
//When several epochs
cache.assign(epoch = 1, offset = 11)
cache.assign(epoch = 1, offset = 12)
cache.assign(epoch = 2, offset = 13)
cache.assign(epoch = 2, offset = 14)
cache.assign(epoch = 3, offset = 15)
cache.assign(epoch = 3, offset = 16)
leo = 17
cache.assign(epoch = 1, startOffset = 11)
cache.assign(epoch = 1, startOffset = 12)
cache.assign(epoch = 2, startOffset = 13)
cache.assign(epoch = 2, startOffset = 14)
cache.assign(epoch = 3, startOffset = 15)
cache.assign(epoch = 3, startOffset = 16)
logEndOffset = 17
//Then get the start offset of the next epoch
assertEquals((2, 15), cache.endOffsetFor(2))
@ -204,15 +180,10 @@ class LeaderEpochFileCacheTest { @@ -204,15 +180,10 @@ class LeaderEpochFileCacheTest {
@Test
def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When
cache.assign(epoch = 0, offset = 10)
cache.assign(epoch = 2, offset = 13)
cache.assign(epoch = 4, offset = 17)
cache.assign(epoch = 0, startOffset = 10)
cache.assign(epoch = 2, startOffset = 13)
cache.assign(epoch = 4, startOffset = 17)
//Then
assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1))
@ -222,14 +193,9 @@ class LeaderEpochFileCacheTest { @@ -222,14 +193,9 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 2, offset = 7)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 2, startOffset = 7)
//Then
assertEquals(1, cache.epochEntries.size)
@ -238,14 +204,10 @@ class LeaderEpochFileCacheTest { @@ -238,14 +204,10 @@ class LeaderEpochFileCacheTest {
@Test
def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
val leo = 100
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
logEndOffset = 100
//When
cache.assign(epoch = 2, offset = 100)
cache.assign(epoch = 2, startOffset = 100)
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3))
@ -253,35 +215,28 @@ class LeaderEpochFileCacheTest { @@ -253,35 +215,28 @@ class LeaderEpochFileCacheTest {
@Test
def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When
cache.assign(epoch = 2, offset = 6)
leo = 7
cache.assign(epoch = 2, startOffset = 6)
logEndOffset = 7
//Then
assertEquals((2, leo), cache.endOffsetFor(2))
assertEquals((2, logEndOffset), cache.endOffsetFor(2))
assertEquals(1, cache.epochEntries.size)
assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
assertEquals(EpochEntry(2, 6), cache.epochEntries(0))
}
@Test
def shouldPersistEpochsBetweenInstances(){
def leoFinder() = new LogOffsetMetadata(0)
val checkpointPath = TestUtils.tempFile().getAbsolutePath
checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
cache.assign(epoch = 2, startOffset = 6)
//When
val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2)
val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2)
//Then
assertEquals(1, cache2.epochEntries.size)
@ -289,81 +244,68 @@ class LeaderEpochFileCacheTest { @@ -289,81 +244,68 @@ class LeaderEpochFileCacheTest {
}
@Test
def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
//Given
cache.assign(epoch = 1, offset = 5); leo = 6
cache.assign(epoch = 2, offset = 6); leo = 7
cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
//When we update an epoch in the past with an earlier offset
cache.assign(epoch = 1, offset = 7); leo = 8
//When we update an epoch in the past with a different offset, the log has already reached
//an inconsistent state. Our options are either to raise an error, ignore the new append,
//or truncate the cached epochs to the point of conflict. We take this latter approach in
//order to guarantee that epochs and offsets in the cache increase monotonically, which makes
//the search logic simpler to reason about.
cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8
//Then epoch should not be changed
assertEquals(2, cache.latestEpoch())
//Then later epochs will be removed
assertEquals(1, cache.latestEpoch)
//Then end offset for epoch 1 shouldn't have changed
assertEquals((1, 6), cache.endOffsetFor(1))
//Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't think of a better option)
assertEquals((2, 8), cache.endOffsetFor(2))
//Then end offset for epoch 1 will have changed
assertEquals((1, 8), cache.endOffsetFor(1))
//Epoch history shouldn't have changed
assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
assertEquals(EpochEntry(2, 6), cache.epochEntries()(1))
//Then end offset for epoch 2 is now undefined
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2))
assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
}
@Test
def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
def shouldEnforceOffsetsIncreaseMonotonically() = {
//When epoch goes forward but offset goes backwards
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 5)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 5)
//Then latter assign should be ignored
assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
//The last assignment wins and the conflicting one is removed from the log
assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0))
}
@Test
def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 0, offset = 0) //leo=0
cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0
//When
cache.assign(epoch = 1, offset = 0) //leo=0
cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0
//Then epoch should go up
assertEquals(1, cache.latestEpoch())
assertEquals(1, cache.latestEpoch)
//offset for 1 should still be 0
assertEquals((1, 0), cache.endOffsetFor(1))
//offset for epoch 0 should still be 0
assertEquals((0, 0), cache.endOffsetFor(0))
//When we write 5 messages as epoch 1
leo = 5
logEndOffset = 5
//Then end offset for epoch(1) should be leo => 5
//Then end offset for epoch(1) should be logEndOffset => 5
assertEquals((1, 5), cache.endOffsetFor(1))
//Epoch 0 should still be at offset 0
assertEquals((0, 0), cache.endOffsetFor(0))
//When
cache.assign(epoch = 2, offset = 5) //leo=5
cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5
leo = 10 //write another 5 messages
logEndOffset = 10 //write another 5 messages
//Then end offset for epoch(2) should be leo => 10
//Then end offset for epoch(2) should be logEndOffset => 10
assertEquals((2, 10), cache.endOffsetFor(2))
//end offset for epoch(1) should be the start offset of epoch(2) => 5
@ -375,36 +317,30 @@ class LeaderEpochFileCacheTest { @@ -375,36 +317,30 @@ class LeaderEpochFileCacheTest {
@Test
def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
var leo = 0
def leoFinder() = new LogOffsetMetadata(leo)
//When new
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//When Messages come in
cache.assign(epoch = 0, offset = 0); leo = 1
cache.assign(epoch = 0, offset = 1); leo = 2
cache.assign(epoch = 0, offset = 2); leo = 3
cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1
cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2
cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3
//Then epoch should stay, offsets should grow
assertEquals(0, cache.latestEpoch())
assertEquals((0, leo), cache.endOffsetFor(0))
assertEquals(0, cache.latestEpoch)
assertEquals((0, logEndOffset), cache.endOffsetFor(0))
//When messages arrive with greater epoch
cache.assign(epoch = 1, offset = 3); leo = 4
cache.assign(epoch = 1, offset = 4); leo = 5
cache.assign(epoch = 1, offset = 5); leo = 6
cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4
cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5
cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
assertEquals(1, cache.latestEpoch())
assertEquals((1, leo), cache.endOffsetFor(1))
assertEquals(1, cache.latestEpoch)
assertEquals((1, logEndOffset), cache.endOffsetFor(1))
//When
cache.assign(epoch = 2, offset = 6); leo = 7
cache.assign(epoch = 2, offset = 7); leo = 8
cache.assign(epoch = 2, offset = 8); leo = 9
cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8
cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9
assertEquals(2, cache.latestEpoch())
assertEquals((2, leo), cache.endOffsetFor(2))
assertEquals(2, cache.latestEpoch)
assertEquals((2, logEndOffset), cache.endOffsetFor(2))
//Older epochs should return the start offset of the first message in the subsequent epoch.
assertEquals((0, 3), cache.endOffsetFor(0))
@ -413,16 +349,13 @@ class LeaderEpochFileCacheTest { @@ -413,16 +349,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When clear latest on epoch boundary
cache.clearAndFlushLatest(offset = 8)
cache.truncateFromEnd(endOffset = 8)
//Then should remove two latest epochs (remove is inclusive)
assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
@ -430,16 +363,13 @@ class LeaderEpochFileCacheTest { @@ -430,16 +363,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset ON epoch boundary
cache.clearAndFlushEarliest(offset = 8)
cache.truncateFromStart(startOffset = 8)
//Then should preserve (3, 8)
assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@ -447,16 +377,13 @@ class LeaderEpochFileCacheTest { @@ -447,16 +377,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset BETWEEN epoch boundaries
cache.clearAndFlushEarliest(offset = 9)
cache.truncateFromStart(startOffset = 9)
//Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@ -464,16 +391,13 @@ class LeaderEpochFileCacheTest { @@ -464,16 +391,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotClearAnythingIfOffsetToEarly(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset before first epoch offset
cache.clearAndFlushEarliest(offset = 1)
cache.truncateFromStart(startOffset = 1)
//Then nothing should change
assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@ -481,16 +405,13 @@ class LeaderEpochFileCacheTest { @@ -481,16 +405,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset on earliest epoch boundary
cache.clearAndFlushEarliest(offset = 6)
cache.truncateFromStart(startOffset = 6)
//Then nothing should change
assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@ -498,16 +419,13 @@ class LeaderEpochFileCacheTest { @@ -498,16 +419,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldRetainLatestEpochOnClearAllEarliest(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When
cache.clearAndFlushEarliest(offset = 11)
cache.truncateFromStart(startOffset = 11)
//Then retain the last
assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
@ -515,16 +433,13 @@ class LeaderEpochFileCacheTest { @@ -515,16 +433,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When we clear from a postition between offset 8 & offset 11
cache.clearAndFlushEarliest(offset = 9)
cache.truncateFromStart(startOffset = 9)
//Then we should update the middle epoch entry's offset
assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@ -532,16 +447,13 @@ class LeaderEpochFileCacheTest { @@ -532,16 +447,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 0, offset = 0)
cache.assign(epoch = 1, offset = 7)
cache.assign(epoch = 2, offset = 10)
cache.assign(epoch = 0, startOffset = 0)
cache.assign(epoch = 1, startOffset = 7)
cache.assign(epoch = 2, startOffset = 10)
//When we clear from a postition between offset 0 & offset 7
cache.clearAndFlushEarliest(offset = 5)
cache.truncateFromStart(startOffset = 5)
//Then we should keeep epoch 0 but update the offset appropriately
assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
@ -549,16 +461,13 @@ class LeaderEpochFileCacheTest { @@ -549,16 +461,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset beyond last epoch
cache.clearAndFlushEarliest(offset = 15)
cache.truncateFromStart(startOffset = 15)
//Then update the last
assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
@ -566,31 +475,25 @@ class LeaderEpochFileCacheTest { @@ -566,31 +475,25 @@ class LeaderEpochFileCacheTest {
@Test
def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset BETWEEN epoch boundaries
cache.clearAndFlushLatest(offset = 9)
cache.truncateFromEnd(endOffset = 9)
//Then should keep the preceding epochs
assertEquals(3, cache.latestEpoch())
assertEquals(3, cache.latestEpoch)
assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
}
@Test
def shouldClearAllEntries(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When
cache.clearAndFlush()
@ -601,16 +504,13 @@ class LeaderEpochFileCacheTest { @@ -601,16 +504,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset on epoch boundary
cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET)
cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
//Then should do nothing
assertEquals(3, cache.epochEntries.size)
@ -618,16 +518,13 @@ class LeaderEpochFileCacheTest { @@ -618,16 +518,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = {
def leoFinder() = new LogOffsetMetadata(0)
//Given
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
cache.assign(epoch = 2, offset = 6)
cache.assign(epoch = 3, offset = 8)
cache.assign(epoch = 4, offset = 11)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
//When reset to offset on epoch boundary
cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET)
cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
//Then should do nothing
assertEquals(3, cache.epochEntries.size)
@ -635,54 +532,26 @@ class LeaderEpochFileCacheTest { @@ -635,54 +532,26 @@ class LeaderEpochFileCacheTest {
@Test
def shouldFetchLatestEpochOfEmptyCache(): Unit = {
//Given
def leoFinder() = new LogOffsetMetadata(0)
//When
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then
assertEquals(-1, cache.latestEpoch)
}
@Test
def shouldFetchEndOffsetOfEmptyCache(): Unit = {
//Given
def leoFinder() = new LogOffsetMetadata(0)
//When
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7))
}
@Test
def shouldClearEarliestOnEmptyCache(): Unit = {
//Given
def leoFinder() = new LogOffsetMetadata(0)
//When
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then
cache.clearAndFlushEarliest(7)
cache.truncateFromStart(7)
}
@Test
def shouldClearLatestOnEmptyCache(): Unit = {
//Given
def leoFinder() = new LogOffsetMetadata(0)
//When
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then
cache.clearAndFlushLatest(7)
cache.truncateFromEnd(7)
}
@Before
def setUp() {
checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
}
}

49
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package kafka.server.epoch
import java.util.{Optional, Map => JMap}
import java.util.Optional
import kafka.server.KafkaConfig._
import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe @@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ListBuffer
class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
var brokers: Seq[KafkaServer] = null
var brokers: ListBuffer[KafkaServer] = ListBuffer()
val topic1 = "foo"
val topic2 = "bar"
val t1p0 = new TopicPartition(topic1, 0)
@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
@Test
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
// Given two topics with replication of a single partition
for (topic <- List(topic1, topic2)) {
@ -94,7 +95,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -94,7 +95,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = {
//3 brokers, put partition on 100/101 and then pretend to be 102
brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
TestUtils.createTopic(zkClient, topic1, assignment1, brokers)
@ -138,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -138,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
@Test
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = {
//Setup: we are only interested in the single partition on broker 101
brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
brokers += createServer(fromProps(createBrokerConfig(100, zkConnect)))
assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
brokers += createServer(fromProps(createBrokerConfig(101, zkConnect)))
def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
@ -150,10 +154,10 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -150,10 +154,10 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
//Then epoch should be 0 and leo: 1
var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
assertEquals(1, offset)
assertEquals(leo(), offset)
var epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
assertEquals(0, epochEndOffset.leaderEpoch)
assertEquals(1, epochEndOffset.endOffset)
assertEquals(1, leo())
//2. When broker is bounced
brokers(1).shutdown()
@ -162,15 +166,23 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -162,15 +166,23 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
//Then epoch 0 should still be the start offset of epoch 1
offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
assertEquals(1, offset)
//Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica)
assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
assertEquals(1, epochEndOffset.endOffset)
assertEquals(0, epochEndOffset.leaderEpoch)
//No data written in epoch 1
epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 1))(tp)
assertEquals(0, epochEndOffset.leaderEpoch)
assertEquals(1, epochEndOffset.endOffset)
//Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 -
//This is because we have to first change leader to -1 and then change it again to the live replica)
//Note that the expected leader changes depend on the controller being on broker 100, which is not restarted
epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 2))(tp)
assertEquals(2, epochEndOffset.leaderEpoch)
assertEquals(2, epochEndOffset.endOffset)
assertEquals(2, leo())
//3. When broker is bounced again
brokers(1).shutdown()
@ -179,7 +191,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -179,7 +191,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
//Then Epoch 0 should still map to offset 1
assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset())

2
core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala

@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest { @@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest {
//Stubs
val mockLog = createNiceMock(classOf[kafka.log.Log])
val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
val logManager = createNiceMock(classOf[kafka.log.LogManager])
expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()

5
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -373,10 +373,11 @@ object TestUtils extends Logging { @@ -373,10 +373,11 @@ object TestUtils extends Logging {
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
sequence: Int = RecordBatch.NO_SEQUENCE,
baseOffset: Long = 0L): MemoryRecords = {
baseOffset: Long = 0L,
partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset,
System.currentTimeMillis, producerId, producerEpoch, sequence)
System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch)
records.foreach(builder.append)
builder.build()
}

Loading…
Cancel
Save