diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2036bb09da8..307fb81447b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -301,8 +301,17 @@ class Partition(val topic: String, leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion - val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new + // leader epoch and the start offset since it should be larger than any epoch that a follower + // would try to query. + leaderReplica.epochs.foreach { epochCache => + epochCache.assign(leaderEpoch, leaderEpochStartOffset) + } + + val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId) val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index d729dadcb48..22860c71475 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.log.Log -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -55,7 +55,7 @@ class Replica(val brokerId: Int, def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs - val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache) + val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache) info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue") log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c9b877bdca9..699d3d1fb05 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} -import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors._ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.{KafkaException, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -229,7 +229,7 @@ class Log(@volatile var dir: File, /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] - @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache() + @volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache() locally { val startMs = time.milliseconds @@ -239,12 +239,12 @@ class Log(@volatile var dir: File, /* Calculate the offset of the next message */ nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) - _leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset) + _leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset) logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - _leaderEpochCache.clearAndFlushEarliest(logStartOffset) + _leaderEpochCache.truncateFromStart(logStartOffset) // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here // from scratch. @@ -296,11 +296,11 @@ class Log(@volatile var dir: File, def leaderEpochCache = _leaderEpochCache - private def initializeLeaderEpochCache(): LeaderEpochCache = { + private def initializeLeaderEpochCache(): LeaderEpochFileCache = { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) - new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata, - new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)) + val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) } /** @@ -422,7 +422,7 @@ class Log(@volatile var dir: File, * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow */ private def recoverSegment(segment: LogSegment, - leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized { + leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized { val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager) val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache) @@ -941,7 +941,7 @@ class Log(@volatile var dir: File, if (newLogStartOffset > logStartOffset) { info(s"Incrementing log start offset to $newLogStartOffset") logStartOffset = newLogStartOffset - _leaderEpochCache.clearAndFlushEarliest(logStartOffset) + _leaderEpochCache.truncateFromStart(logStartOffset) producerStateManager.truncateHead(logStartOffset) updateFirstUnstableOffset() } @@ -1650,7 +1650,7 @@ class Log(@volatile var dir: File, updateLogEndOffset(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) this.logStartOffset = math.min(targetOffset, this.logStartOffset) - _leaderEpochCache.clearAndFlushLatest(targetOffset) + _leaderEpochCache.truncateFromEnd(targetOffset) loadProducerState(targetOffset, reloadFromCleanShutdown = false) } true diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0c00e5588f2..80763a8d797 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import kafka.common.LogSegmentOffsetOverflowException import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException @@ -330,7 +330,7 @@ class LogSegment private[log] (val log: FileRecords, * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow */ @nonthreadsafe - def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = { + def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = { offsetIndex.reset() timeIndex.reset() txnIndex.reset() diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 88f5d6bd8e3..cee6bb66bdf 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -18,53 +18,69 @@ package kafka.server.epoch import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.server.LogOffsetMetadata import kafka.server.checkpoints.LeaderEpochCheckpoint import org.apache.kafka.common.requests.EpochEndOffset._ import kafka.utils.CoreUtils._ import kafka.utils.Logging import org.apache.kafka.common.TopicPartition -import scala.collection.mutable.ListBuffer -trait LeaderEpochCache { - def assign(leaderEpoch: Int, offset: Long) - def latestEpoch: Int - def endOffsetFor(epoch: Int): (Int, Long) - def clearAndFlushLatest(offset: Long) - def clearAndFlushEarliest(offset: Long) - def clearAndFlush() - def clear() -} +import scala.collection.mutable.ListBuffer /** - * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. - * - * Leader Epoch = epoch assigned to each leader by the controller. - * Offset = offset of the first message in each epoch. - * - * @param leo a function that determines the log end offset - * @param checkpoint the checkpoint file - */ -class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging { + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + * + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + * @param logEndOffset function to fetch the current log end offset + */ +class LeaderEpochFileCache(topicPartition: TopicPartition, + logEndOffset: () => Long, + checkpoint: LeaderEpochCheckpoint) extends Logging { + this.logIdent = s"[LeaderEpochCache $topicPartition] " + private val lock = new ReentrantReadWriteLock() private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) } /** * Assigns the supplied Leader Epoch to the supplied Offset * Once the epoch is assigned it cannot be reassigned - * - * @param epoch - * @param offset */ - override def assign(epoch: Int, offset: Long): Unit = { + def assign(epoch: Int, startOffset: Long): Unit = { inWriteLock(lock) { - if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) { - info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.") - epochs += EpochEntry(epoch, offset) - flush() + val updateNeeded = if (epochs.isEmpty) { + true } else { - validateAndMaybeWarn(epoch, offset) + val lastEntry = epochs.last + lastEntry.epoch != epoch || startOffset < lastEntry.startOffset } + + if (updateNeeded) { + truncateAndAppend(EpochEntry(epoch, startOffset)) + flush() + } + } + } + + /** + * Remove any entries which violate monotonicity following the insertion of an assigned epoch. + */ + private def truncateAndAppend(entryToAppend: EpochEntry): Unit = { + validateAndMaybeWarn(entryToAppend) + + val (retainedEpochs, removedEpochs) = epochs.partition { entry => + entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset + } + + epochs = retainedEpochs :+ entryToAppend + + if (removedEpochs.isEmpty) { + debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.") + } else { + warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " + + s"Cache now contains ${epochs.size} entries.") } } @@ -74,7 +90,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * * @return */ - override def latestEpoch(): Int = { + def latestEpoch: Int = { inReadLock(lock) { if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch } @@ -93,45 +109,59 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * so that the follower falls back to High Water Mark. * * @param requestedEpoch requested leader epoch - * @return leader epoch and offset + * @return found leader epoch and end offset */ - override def endOffsetFor(requestedEpoch: Int): (Int, Long) = { + def endOffsetFor(requestedEpoch: Int): (Int, Long) = { inReadLock(lock) { val epochAndOffset = if (requestedEpoch == UNDEFINED_EPOCH) { - // this may happen if a bootstrapping follower sends a request with undefined epoch or + // This may happen if a bootstrapping follower sends a request with undefined epoch or // a follower is on the older message format where leader epochs are not recorded (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } else if (requestedEpoch == latestEpoch) { - (requestedEpoch, leo().messageOffset) + // For the leader, the latest epoch is always the current leader epoch that is still being written to. + // Followers should not have any reason to query for the end offset of the current epoch, but a consumer + // might if it is verifying its committed offset following a group rebalance. In this case, we return + // the current log end offset which makes the truncation check work as expected. + (requestedEpoch, logEndOffset()) } else { val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch} - if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) - // no epochs recorded or requested epoch < the first epoch cached + if (subsequentEpochs.isEmpty) { + // The requested epoch is larger than any known epoch. This case should never be hit because + // the latest cached epoch is always the largest. (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - else { - // we must get at least one element in previous epochs list, because if we are here, - // it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is + } else if (previousEpochs.isEmpty) { + // The requested epoch is smaller than any known epoch, so we return the start offset of the first + // known epoch which is larger than it. This may be inaccurate as there could have been + // epochs in between, but the point is that the data has already been removed from the log + // and we want to ensure that the follower can replicate correctly beginning from the leader's + // start offset. + (requestedEpoch, subsequentEpochs.head.startOffset) + } else { + // We have at least one previous epoch and one subsequent epoch. The result is the first + // prior epoch and the starting offset of the first subsequent epoch. (previousEpochs.last.epoch, subsequentEpochs.head.startOffset) } } - debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning epoch ${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size ${epochs.size}") + debug(s"Processed end offset request for epoch $requestedEpoch and returning epoch ${epochAndOffset._1} " + + s"with end offset ${epochAndOffset._2} from epoch cache of size ${epochs.size}") epochAndOffset } } /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. - * - * @param offset */ - override def clearAndFlushLatest(offset: Long): Unit = { + def truncateFromEnd(endOffset: Long): Unit = { inWriteLock(lock) { - val before = epochs - if (offset >= 0 && offset <= latestOffset()) { - epochs = epochs.filter(entry => entry.startOffset < offset) + if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) { + val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset) + epochs = previousEntries + flush() - info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition") + + debug(s"Cleared entries $subsequentEntries from epoch cache after " + + s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.") } } } @@ -142,20 +172,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6. * - * @param offset the offset to clear up to + * @param startOffset the offset to clear up to */ - override def clearAndFlushEarliest(offset: Long): Unit = { + def truncateFromStart(startOffset: Long): Unit = { inWriteLock(lock) { - val before = epochs - if (offset >= 0 && earliestOffset() < offset) { - val earliest = epochs.filter(entry => entry.startOffset < offset) - if (earliest.nonEmpty) { - epochs = epochs --= earliest - //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset - if (offset < earliestOffset() || epochs.isEmpty) - new EpochEntry(earliest.last.epoch, offset) +=: epochs + if (epochs.nonEmpty) { + val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset) + + previousEntries.lastOption.foreach { firstBeforeStartOffset => + val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset) + epochs = updatedFirstEntry +: subsequentEntries + flush() - info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition") + + debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " + + s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.") } } } @@ -164,47 +195,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM /** * Delete all entries. */ - override def clearAndFlush() = { + def clearAndFlush() = { inWriteLock(lock) { epochs.clear() flush() } } - override def clear() = { + def clear() = { inWriteLock(lock) { epochs.clear() } } - def epochEntries(): ListBuffer[EpochEntry] = { + // Visible for testing + def epochEntries: ListBuffer[EpochEntry] = { epochs } - private def earliestOffset(): Long = { - if (epochs.isEmpty) -1 else epochs.head.startOffset - } - - private def latestOffset(): Long = { - if (epochs.isEmpty) -1 else epochs.last.startOffset - } + private def latestEntry: Option[EpochEntry] = epochs.lastOption private def flush(): Unit = { checkpoint.write(epochs) } - def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset:$latestOffset} for Partition: $topicPartition" - - def validateAndMaybeWarn(epoch: Int, offset: Long) = { - assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}") - if (epoch < latestEpoch()) - warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " + - s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}") - else if (offset < latestOffset()) - warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " + - s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}") + private def validateAndMaybeWarn(entry: EpochEntry) = { + if (entry.epoch < 0) { + throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry") + } else { + // If the latest append violates the monotonicity of epochs or starting offsets, our choices + // are either to raise an error, ignore the append, or allow the append and truncate the + // conflicting entries from the cache. Raising an error risks killing the fetcher threads in + // pathological cases (i.e. cases we are not yet aware of). We instead take the final approach + // and assume that the latest append is always accurate. + + latestEntry.foreach { latest => + if (entry.epoch < latest.epoch) + warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " + + s"of the latest entry $latest. This implies messages have arrived out of order.") + else if (entry.startOffset < latest.startOffset) + warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " + + s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.") + } + } } } // Mapping of epoch to the first offset of the subsequent epoch -case class EpochEntry(epoch: Int, startOffset: Long) +case class EpochEntry(epoch: Int, startOffset: Long) { + override def toString: String = { + s"EpochEntry(epoch=$epoch, startOffset=$startOffset)" + } +} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 343693e82c7..7cdc7789963 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -97,6 +97,39 @@ class PartitionTest { replicaManager.shutdown(checkpointHW = false) } + @Test + def testMakeLeaderUpdatesEpochCache(): Unit = { + val controllerEpoch = 3 + val leader = brokerId + val follower = brokerId + 1 + val controllerId = brokerId + 3 + val replicas = List[Integer](leader, follower).asJava + val isr = List[Integer](leader, follower).asJava + val leaderEpoch = 8 + + val log = logManager.getOrCreateLog(topicPartition, logConfig) + log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0, + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes) + ), leaderEpoch = 0) + log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5, + new SimpleRecord("k3".getBytes, "v3".getBytes), + new SimpleRecord("k4".getBytes, "v4".getBytes) + ), leaderEpoch = 5) + assertEquals(4, log.logEndOffset) + + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + assertTrue("Expected makeLeader to succeed", + partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, + isr, 1, replicas, true), 0)) + + assertEquals(Some(4), partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)) + + val epochEndOffset = partition.lastOffsetForLeaderEpoch(leaderEpoch) + assertEquals(4, epochEndOffset.endOffset) + assertEquals(leaderEpoch, epochEndOffset.leaderEpoch) + } + @Test // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently def testMaybeReplaceCurrentWithFutureReplica(): Unit = { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1381bc66fe4..f8d76b6f767 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -25,7 +25,7 @@ import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix -import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} +import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -287,7 +287,7 @@ class LogTest { } override def recover(producerStateManager: ProducerStateManager, - leaderEpochCache: Option[LeaderEpochCache]): Int = { + leaderEpochCache: Option[LeaderEpochFileCache]): Int = { recoveredSegments += this super.recover(producerStateManager, leaderEpochCache) } @@ -2589,8 +2589,8 @@ class LogTest { log.onHighWatermarkIncremented(log.logEndOffset) log.deleteOldSegments() assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) - assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size) - assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries().head) + assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size) + assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head) // append some messages to create some segments for (_ <- 0 until 100) @@ -2599,7 +2599,7 @@ class LogTest { log.delete() assertEquals("The number of segments should be 0", 0, log.numberOfSegments) assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments()) - assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size) + assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size) } @Test @@ -2612,12 +2612,12 @@ class LogTest { log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) - assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size) + assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size) log.close() log.delete() assertEquals("The number of segments should be 0", 0, log.numberOfSegments) - assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size) + assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size) } @Test @@ -2816,7 +2816,7 @@ class LogTest { for (i <- records.indices) log.appendAsFollower(recordsForEpoch(i)) - assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch()) + assertEquals(42, log.leaderEpochCache.latestEpoch) } @Test @@ -2871,19 +2871,24 @@ class LogTest { @Test def shouldTruncateLeaderEpochFileWhenTruncatingLog() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) - val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes) + def createRecords(startOffset: Long, epoch: Int): MemoryRecords = { + TestUtils.records(Seq(new SimpleRecord("value".getBytes)), + baseOffset = startOffset, partitionLeaderEpoch = epoch) + } + + val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes) val log = createLog(logDir, logConfig) val cache = epochCache(log) - //Given 2 segments, 10 messages per segment - for (epoch <- 1 to 20) - log.appendAsLeader(createRecords, leaderEpoch = 0) + def append(epoch: Int, startOffset: Long, count: Int): Unit = { + for (i <- 0 until count) + log.appendAsFollower(createRecords(startOffset + i, epoch)) + } - //Simulate some leader changes at specific offsets - cache.assign(0, 0) - cache.assign(1, 10) - cache.assign(2, 16) + //Given 2 segments, 10 messages per segment + append(epoch = 0, startOffset = 0, count = 10) + append(epoch = 1, startOffset = 10, count = 6) + append(epoch = 2, startOffset = 16, count = 4) assertEquals(2, log.numberOfSegments) assertEquals(20, log.logEndOffset) @@ -2935,7 +2940,7 @@ class LogTest { assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.clearAndFlushLatest(2) + leaderEpochCache.truncateFromEnd(2) assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index c90a5b97a1a..3dff709bef6 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.{Partition, Replica} import kafka.log.Log -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics @@ -253,7 +253,7 @@ class IsrExpirationTest { private def logMock: Log = { val log = EasyMock.createMock(classOf[kafka.log.Log]) - val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache]) + val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache]) EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes() EasyMock.expect(log.onHighWatermarkIncremented(0L)) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 8fb5ab6feba..2e28ee13f2d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -21,7 +21,7 @@ import kafka.api.Request import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.log.LogManager import kafka.server.AbstractFetcherThread.ResultWithPartitions -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.{DelayedItem, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{KafkaStorageException, ReplicaNotAvailableException} @@ -46,7 +46,7 @@ class ReplicaAlterLogDirsThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val replica = createNiceMock(classOf[Replica]) val futureReplica = createNiceMock(classOf[Replica]) val partition = createMock(classOf[Partition]) @@ -87,7 +87,7 @@ class ReplicaAlterLogDirsThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val replica = createNiceMock(classOf[Replica]) val partition = createMock(classOf[Partition]) val replicaManager = createMock(classOf[ReplicaManager]) @@ -133,9 +133,9 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochsT1p0 = createMock(classOf[LeaderEpochCache]) - val leaderEpochsT1p1 = createMock(classOf[LeaderEpochCache]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache]) + val leaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache]) + val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaT1p0 = createNiceMock(classOf[Replica]) val replicaT1p1 = createNiceMock(classOf[Replica]) @@ -195,8 +195,8 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createMock(classOf[LeaderEpochCache]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createMock(classOf[LeaderEpochFileCache]) + val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replica = createNiceMock(classOf[Replica]) // one future replica mock because our mocking methods return same values for both future replicas @@ -265,8 +265,8 @@ class ReplicaAlterLogDirsThreadTest { val logManager = createMock(classOf[LogManager]) val replica = createNiceMock(classOf[Replica]) val futureReplica = createNiceMock(classOf[Replica]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) + val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) val partition = createMock(classOf[Partition]) val replicaManager = createMock(classOf[ReplicaManager]) val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture() @@ -319,8 +319,8 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) + val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[kafka.log.LogManager]) val replica = createNiceMock(classOf[Replica]) val futureReplica = createNiceMock(classOf[Replica]) @@ -401,8 +401,8 @@ class ReplicaAlterLogDirsThreadTest { //Setup all dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) + val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replica = createNiceMock(classOf[Replica]) val futureReplica = createNiceMock(classOf[Replica]) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 520801c9bab..9440c29ee7b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -20,7 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Replica} import kafka.log.LogManager import kafka.cluster.Partition import kafka.server.QuotaFactory.UnboundedQuota -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils import org.apache.kafka.clients.ClientResponse @@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest { //Setup all dependencies val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -278,7 +278,7 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -339,7 +339,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -388,7 +388,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -442,7 +442,7 @@ class ReplicaFetcherThreadTest { // Setup all dependencies val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -513,7 +513,7 @@ class ReplicaFetcherThreadTest { // Setup all dependencies val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -574,7 +574,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -619,7 +619,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[kafka.log.LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -677,7 +677,7 @@ class ReplicaFetcherThreadTest { //Setup all stubs val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createNiceMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -728,7 +728,7 @@ class ReplicaFetcherThreadTest { //Setup all stubs val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createNiceMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 08440528638..90d488dd37a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} import kafka.utils.{MockScheduler, MockTime, TestUtils} import TestUtils.createBroker import kafka.cluster.BrokerEndPoint -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.timer.MockTimer import kafka.zk.KafkaZkClient @@ -641,7 +641,7 @@ class ReplicaManagerTest { val mockScheduler = new MockScheduler(time) val mockBrokerTopicStats = new BrokerTopicStats val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) - val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache]) + val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache]) EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader) EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader)) .andReturn((leaderEpochFromLeader, localLogOffset)) @@ -661,7 +661,7 @@ class ReplicaManagerTest { new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 30000), logDirFailureChannel = mockLogDirFailureChannel) { - override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache + override def leaderEpochCache: LeaderEpochFileCache = mockLeaderEpochCache override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset) } diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala index e7c6a9785bc..0c47f15a09f 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala @@ -24,7 +24,6 @@ import org.junit.Assert._ import org.junit.Test import org.scalatest.junit.JUnitSuite - class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{ @Test diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index 48590190b57..5c37891c937 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -90,23 +90,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness assertEquals(0, latestRecord(follower).partitionLeaderEpoch()) //Both leader and follower should have recorded Epoch 0 at Offset 0 - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries) //Bounce the follower bounce(follower) awaitISR(tp) //Nothing happens yet as we haven't sent any new messages. - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries) //Send a message producer.send(new ProducerRecord(topic, 0, null, msg)).get //Epoch1 should now propagate to the follower with the written message - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries) //The new message should have epoch 1 stamped assertEquals(1, latestRecord(leader).partitionLeaderEpoch()) @@ -117,8 +117,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness awaitISR(tp) //Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication. - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries) //Send a message producer.send(new ProducerRecord(topic, 0, null, msg)).get @@ -128,8 +128,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness assertEquals(2, latestRecord(follower).partitionLeaderEpoch()) //The leader epoch files should now match on leader and follower - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries) } @Test @@ -377,8 +377,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness private def log(leader: KafkaServer, follower: KafkaServer): Unit = { info(s"Bounce complete for follower ${follower.config.brokerId}") - info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries()) - info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries()) + info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries) + info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries) } private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index d1f93900ccf..7ac606a2dc5 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -16,6 +16,7 @@ */ package kafka.server.epoch + import java.io.File import kafka.server.LogOffsetMetadata @@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.junit.Assert._ -import org.junit.{Before, Test} +import org.junit.Test import scala.collection.mutable.ListBuffer @@ -33,54 +34,44 @@ import scala.collection.mutable.ListBuffer */ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) - var checkpoint: LeaderEpochCheckpoint = _ + private var logEndOffset = 0L + private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { + private var epochs: Seq[EpochEntry] = Seq() + override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs + override def read(): Seq[EpochEntry] = this.epochs + } + private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) @Test def shouldAddEpochAndMessageOffsetToCache() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 2, offset = 10) - leo = 11 + cache.assign(epoch = 2, startOffset = 10) + logEndOffset = 11 //Then - assertEquals(2, cache.latestEpoch()) - assertEquals(EpochEntry(2, 10), cache.epochEntries()(0)) - assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo + assertEquals(2, cache.latestEpoch) + assertEquals(EpochEntry(2, 10), cache.epochEntries(0)) + assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //should match logEndOffset } @Test def shouldReturnLogEndOffsetIfLatestEpochRequested() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When just one epoch - cache.assign(epoch = 2, offset = 11) - cache.assign(epoch = 2, offset = 12) - leo = 14 + cache.assign(epoch = 2, startOffset = 11) + cache.assign(epoch = 2, startOffset = 12) + logEndOffset = 14 //Then - assertEquals((2, leo), cache.endOffsetFor(2)) + assertEquals((2, logEndOffset), cache.endOffsetFor(2)) } @Test def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = { - def leoFinder() = new LogOffsetMetadata(0) val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - //Given cache with some data on leader - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - // assign couple of epochs - cache.assign(epoch = 2, offset = 11) - cache.assign(epoch = 3, offset = 12) + cache.assign(epoch = 2, startOffset = 11) + cache.assign(epoch = 3, startOffset = 12) //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) @@ -92,68 +83,51 @@ class LeaderEpochFileCacheTest { @Test def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - //Given - leo = 9 - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + logEndOffset = 9 - cache.assign(2, leo) + cache.assign(2, logEndOffset) //When called again later cache.assign(2, 10) //Then the offset should NOT have been updated - assertEquals(leo, cache.epochEntries()(0).startOffset) + assertEquals(logEndOffset, cache.epochEntries(0).startOffset) + assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries) } @Test - def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = { - def leoFinder() = new LogOffsetMetadata(0) - + def shouldEnforceMonotonicallyIncreasingStartOffsets() = { //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) cache.assign(2, 9) //When update epoch new epoch but same offset cache.assign(3, 9) //Then epoch should have been updated - assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries()) + assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries) } @Test def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = { - //Given - val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint) cache.assign(2, 6) //When called again later with a greater offset cache.assign(2, 10) //Then later update should have been ignored - assertEquals(6, cache.epochEntries()(0).startOffset) + assertEquals(6, cache.epochEntries(0).startOffset) } @Test def shouldReturnUnsupportedIfNoEpochRecorded(){ - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0)) } @Test def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){ - val leo = 73 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + logEndOffset = 73 //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) @@ -164,39 +138,41 @@ class LeaderEpochFileCacheTest { } @Test - def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){ - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - - cache.assign(epoch = 5, offset = 11) - cache.assign(epoch = 6, offset = 12) - cache.assign(epoch = 7, offset = 13) + def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(){ + cache.assign(epoch = 5, startOffset = 11) + cache.assign(epoch = 6, startOffset = 12) + cache.assign(epoch = 7, startOffset = 13) //When - val epochAndOffset = cache.endOffsetFor(5 - 1) + val epochAndOffset = cache.endOffsetFor(4) //Then - assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset) + assertEquals((4, 11), epochAndOffset) } @Test - def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) + def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = { + cache.assign(epoch = 5, startOffset = 11) + cache.assign(epoch = 6, startOffset = 12) + cache.assign(epoch = 7, startOffset = 13) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + // epoch 7 starts at an earlier offset + cache.assign(epoch = 7, startOffset = 12) + assertEquals((5, 12), cache.endOffsetFor(5)) + assertEquals((5, 12), cache.endOffsetFor(6)) + } + + @Test + def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = { //When several epochs - cache.assign(epoch = 1, offset = 11) - cache.assign(epoch = 1, offset = 12) - cache.assign(epoch = 2, offset = 13) - cache.assign(epoch = 2, offset = 14) - cache.assign(epoch = 3, offset = 15) - cache.assign(epoch = 3, offset = 16) - leo = 17 + cache.assign(epoch = 1, startOffset = 11) + cache.assign(epoch = 1, startOffset = 12) + cache.assign(epoch = 2, startOffset = 13) + cache.assign(epoch = 2, startOffset = 14) + cache.assign(epoch = 3, startOffset = 15) + cache.assign(epoch = 3, startOffset = 16) + logEndOffset = 17 //Then get the start offset of the next epoch assertEquals((2, 15), cache.endOffsetFor(2)) @@ -204,15 +180,10 @@ class LeaderEpochFileCacheTest { @Test def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){ - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 0, offset = 10) - cache.assign(epoch = 2, offset = 13) - cache.assign(epoch = 4, offset = 17) + cache.assign(epoch = 0, startOffset = 10) + cache.assign(epoch = 2, startOffset = 13) + cache.assign(epoch = 4, startOffset = 17) //Then assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1)) @@ -222,14 +193,9 @@ class LeaderEpochFileCacheTest { @Test def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = { - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 2, offset = 7) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 2, startOffset = 7) //Then assertEquals(1, cache.epochEntries.size) @@ -238,14 +204,10 @@ class LeaderEpochFileCacheTest { @Test def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = { - val leo = 100 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + logEndOffset = 100 //When - cache.assign(epoch = 2, offset = 100) + cache.assign(epoch = 2, startOffset = 100) //Then assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3)) @@ -253,35 +215,28 @@ class LeaderEpochFileCacheTest { @Test def shouldSupportEpochsThatDoNotStartFromZero(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 2, offset = 6) - leo = 7 + cache.assign(epoch = 2, startOffset = 6) + logEndOffset = 7 //Then - assertEquals((2, leo), cache.endOffsetFor(2)) + assertEquals((2, logEndOffset), cache.endOffsetFor(2)) assertEquals(1, cache.epochEntries.size) - assertEquals(EpochEntry(2, 6), cache.epochEntries()(0)) + assertEquals(EpochEntry(2, 6), cache.epochEntries(0)) } @Test def shouldPersistEpochsBetweenInstances(){ - def leoFinder() = new LogOffsetMetadata(0) val checkpointPath = TestUtils.tempFile().getAbsolutePath - checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath)) + val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath)) //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) + val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) + cache.assign(epoch = 2, startOffset = 6) //When val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath)) - val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2) + val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2) //Then assertEquals(1, cache2.epochEntries.size) @@ -289,81 +244,68 @@ class LeaderEpochFileCacheTest { } @Test - def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - + def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = { //Given - cache.assign(epoch = 1, offset = 5); leo = 6 - cache.assign(epoch = 2, offset = 6); leo = 7 - - //When we update an epoch in the past with an earlier offset - cache.assign(epoch = 1, offset = 7); leo = 8 + cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6 + cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7 - //Then epoch should not be changed - assertEquals(2, cache.latestEpoch()) + //When we update an epoch in the past with a different offset, the log has already reached + //an inconsistent state. Our options are either to raise an error, ignore the new append, + //or truncate the cached epochs to the point of conflict. We take this latter approach in + //order to guarantee that epochs and offsets in the cache increase monotonically, which makes + //the search logic simpler to reason about. + cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8 - //Then end offset for epoch 1 shouldn't have changed - assertEquals((1, 6), cache.endOffsetFor(1)) + //Then later epochs will be removed + assertEquals(1, cache.latestEpoch) - //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't think of a better option) - assertEquals((2, 8), cache.endOffsetFor(2)) + //Then end offset for epoch 1 will have changed + assertEquals((1, 8), cache.endOffsetFor(1)) - //Epoch history shouldn't have changed - assertEquals(EpochEntry(1, 5), cache.epochEntries()(0)) - assertEquals(EpochEntry(2, 6), cache.epochEntries()(1)) + //Then end offset for epoch 2 is now undefined + assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2)) + assertEquals(EpochEntry(1, 7), cache.epochEntries(0)) } @Test - def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = { - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - + def shouldEnforceOffsetsIncreaseMonotonically() = { //When epoch goes forward but offset goes backwards - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 5) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 5) - //Then latter assign should be ignored - assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0)) + //The last assignment wins and the conflicting one is removed from the log + assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0)) } @Test def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 0, offset = 0) //leo=0 + cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0 //When - cache.assign(epoch = 1, offset = 0) //leo=0 + cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0 //Then epoch should go up - assertEquals(1, cache.latestEpoch()) + assertEquals(1, cache.latestEpoch) //offset for 1 should still be 0 assertEquals((1, 0), cache.endOffsetFor(1)) //offset for epoch 0 should still be 0 assertEquals((0, 0), cache.endOffsetFor(0)) //When we write 5 messages as epoch 1 - leo = 5 + logEndOffset = 5 - //Then end offset for epoch(1) should be leo => 5 + //Then end offset for epoch(1) should be logEndOffset => 5 assertEquals((1, 5), cache.endOffsetFor(1)) //Epoch 0 should still be at offset 0 assertEquals((0, 0), cache.endOffsetFor(0)) //When - cache.assign(epoch = 2, offset = 5) //leo=5 + cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5 - leo = 10 //write another 5 messages + logEndOffset = 10 //write another 5 messages - //Then end offset for epoch(2) should be leo => 10 + //Then end offset for epoch(2) should be logEndOffset => 10 assertEquals((2, 10), cache.endOffsetFor(2)) //end offset for epoch(1) should be the start offset of epoch(2) => 5 @@ -375,36 +317,30 @@ class LeaderEpochFileCacheTest { @Test def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //When new - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When Messages come in - cache.assign(epoch = 0, offset = 0); leo = 1 - cache.assign(epoch = 0, offset = 1); leo = 2 - cache.assign(epoch = 0, offset = 2); leo = 3 + cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1 + cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2 + cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3 //Then epoch should stay, offsets should grow - assertEquals(0, cache.latestEpoch()) - assertEquals((0, leo), cache.endOffsetFor(0)) + assertEquals(0, cache.latestEpoch) + assertEquals((0, logEndOffset), cache.endOffsetFor(0)) //When messages arrive with greater epoch - cache.assign(epoch = 1, offset = 3); leo = 4 - cache.assign(epoch = 1, offset = 4); leo = 5 - cache.assign(epoch = 1, offset = 5); leo = 6 + cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4 + cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5 + cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6 - assertEquals(1, cache.latestEpoch()) - assertEquals((1, leo), cache.endOffsetFor(1)) + assertEquals(1, cache.latestEpoch) + assertEquals((1, logEndOffset), cache.endOffsetFor(1)) //When - cache.assign(epoch = 2, offset = 6); leo = 7 - cache.assign(epoch = 2, offset = 7); leo = 8 - cache.assign(epoch = 2, offset = 8); leo = 9 + cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7 + cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8 + cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9 - assertEquals(2, cache.latestEpoch()) - assertEquals((2, leo), cache.endOffsetFor(2)) + assertEquals(2, cache.latestEpoch) + assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //Older epochs should return the start offset of the first message in the subsequent epoch. assertEquals((0, 3), cache.endOffsetFor(0)) @@ -413,16 +349,13 @@ class LeaderEpochFileCacheTest { @Test def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When clear latest on epoch boundary - cache.clearAndFlushLatest(offset = 8) + cache.truncateFromEnd(endOffset = 8) //Then should remove two latest epochs (remove is inclusive) assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries) @@ -430,16 +363,13 @@ class LeaderEpochFileCacheTest { @Test def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset ON epoch boundary - cache.clearAndFlushEarliest(offset = 8) + cache.truncateFromStart(startOffset = 8) //Then should preserve (3, 8) assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -447,16 +377,13 @@ class LeaderEpochFileCacheTest { @Test def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset BETWEEN epoch boundaries - cache.clearAndFlushEarliest(offset = 9) + cache.truncateFromStart(startOffset = 9) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries) @@ -464,16 +391,13 @@ class LeaderEpochFileCacheTest { @Test def shouldNotClearAnythingIfOffsetToEarly(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset before first epoch offset - cache.clearAndFlushEarliest(offset = 1) + cache.truncateFromStart(startOffset = 1) //Then nothing should change assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -481,16 +405,13 @@ class LeaderEpochFileCacheTest { @Test def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset on earliest epoch boundary - cache.clearAndFlushEarliest(offset = 6) + cache.truncateFromStart(startOffset = 6) //Then nothing should change assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -498,16 +419,13 @@ class LeaderEpochFileCacheTest { @Test def shouldRetainLatestEpochOnClearAllEarliest(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When - cache.clearAndFlushEarliest(offset = 11) + cache.truncateFromStart(startOffset = 11) //Then retain the last assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries) @@ -515,16 +433,13 @@ class LeaderEpochFileCacheTest { @Test def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When we clear from a postition between offset 8 & offset 11 - cache.clearAndFlushEarliest(offset = 9) + cache.truncateFromStart(startOffset = 9) //Then we should update the middle epoch entry's offset assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries) @@ -532,16 +447,13 @@ class LeaderEpochFileCacheTest { @Test def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 0, offset = 0) - cache.assign(epoch = 1, offset = 7) - cache.assign(epoch = 2, offset = 10) + cache.assign(epoch = 0, startOffset = 0) + cache.assign(epoch = 1, startOffset = 7) + cache.assign(epoch = 2, startOffset = 10) //When we clear from a postition between offset 0 & offset 7 - cache.clearAndFlushEarliest(offset = 5) + cache.truncateFromStart(startOffset = 5) //Then we should keeep epoch 0 but update the offset appropriately assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries) @@ -549,16 +461,13 @@ class LeaderEpochFileCacheTest { @Test def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset beyond last epoch - cache.clearAndFlushEarliest(offset = 15) + cache.truncateFromStart(startOffset = 15) //Then update the last assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries) @@ -566,51 +475,42 @@ class LeaderEpochFileCacheTest { @Test def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset BETWEEN epoch boundaries - cache.clearAndFlushLatest(offset = 9) + cache.truncateFromEnd(endOffset = 9) //Then should keep the preceding epochs - assertEquals(3, cache.latestEpoch()) + assertEquals(3, cache.latestEpoch) assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries) } @Test def shouldClearAllEntries(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) - //When + //When cache.clearAndFlush() - //Then + //Then assertEquals(0, cache.epochEntries.size) } @Test def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset on epoch boundary - cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET) + cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -618,16 +518,13 @@ class LeaderEpochFileCacheTest { @Test def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset on epoch boundary - cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET) + cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -635,54 +532,26 @@ class LeaderEpochFileCacheTest { @Test def shouldFetchLatestEpochOfEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then assertEquals(-1, cache.latestEpoch) } @Test def shouldFetchEndOffsetOfEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7)) } @Test def shouldClearEarliestOnEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then - cache.clearAndFlushEarliest(7) + cache.truncateFromStart(7) } @Test def shouldClearLatestOnEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then - cache.clearAndFlushLatest(7) + cache.truncateFromEnd(7) } - @Before - def setUp() { - checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile()) - } } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index 5ad641f11a0..efc07177037 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -16,7 +16,7 @@ */ package kafka.server.epoch -import java.util.{Optional, Map => JMap} +import java.util.Optional import kafka.server.KafkaConfig._ import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend} @@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.mutable.ListBuffer class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { - var brokers: Seq[KafkaServer] = null + var brokers: ListBuffer[KafkaServer] = ListBuffer() val topic1 = "foo" val topic2 = "bar" val t1p0 = new TopicPartition(topic1, 0) @@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @Test def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() { - brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } // Given two topics with replication of a single partition for (topic <- List(topic1, topic2)) { @@ -94,7 +95,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = { //3 brokers, put partition on 100/101 and then pretend to be 102 - brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101)) TestUtils.createTopic(zkClient, topic1, assignment1, brokers) @@ -138,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @Test def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = { - //Setup: we are only interested in the single partition on broker 101 - brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + brokers += createServer(fromProps(createBrokerConfig(100, zkConnect))) + assertEquals(100, TestUtils.waitUntilControllerElected(zkClient)) + + brokers += createServer(fromProps(createBrokerConfig(101, zkConnect))) + def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers) producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1) @@ -150,10 +154,10 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1))) //Then epoch should be 0 and leo: 1 - var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset() - assertEquals(1, offset) - assertEquals(leo(), offset) - + var epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp) + assertEquals(0, epochEndOffset.leaderEpoch) + assertEquals(1, epochEndOffset.endOffset) + assertEquals(1, leo()) //2. When broker is bounced brokers(1).shutdown() @@ -162,15 +166,23 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get fetcher = new TestFetcherThread(sender(brokers(0), brokers(1))) - //Then epoch 0 should still be the start offset of epoch 1 - offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset() - assertEquals(1, offset) - - //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica) - assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset()) - assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset()) - + epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp) + assertEquals(1, epochEndOffset.endOffset) + assertEquals(0, epochEndOffset.leaderEpoch) + + //No data written in epoch 1 + epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 1))(tp) + assertEquals(0, epochEndOffset.leaderEpoch) + assertEquals(1, epochEndOffset.endOffset) + + //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - + //This is because we have to first change leader to -1 and then change it again to the live replica) + //Note that the expected leader changes depend on the controller being on broker 100, which is not restarted + epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 2))(tp) + assertEquals(2, epochEndOffset.leaderEpoch) + assertEquals(2, epochEndOffset.endOffset) + assertEquals(2, leo()) //3. When broker is bounced again brokers(1).shutdown() @@ -179,7 +191,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get fetcher = new TestFetcherThread(sender(brokers(0), brokers(1))) - //Then Epoch 0 should still map to offset 1 assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()) diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 4fdc4d26992..86a087b480d 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest { //Stubs val mockLog = createNiceMock(classOf[kafka.log.Log]) - val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) + val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache]) val logManager = createNiceMock(classOf[kafka.log.LogManager]) expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset) expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4dc822b52c9..df9902f7003 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -373,10 +373,11 @@ object TestUtils extends Logging { producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, sequence: Int = RecordBatch.NO_SEQUENCE, - baseOffset: Long = 0L): MemoryRecords = { + baseOffset: Long = 0L, + partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset, - System.currentTimeMillis, producerId, producerEpoch, sequence) + System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch) records.foreach(builder.append) builder.build() }