Browse Source

MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values (#13268)

Reviewers: Satish Duggana <satishd@apache.org>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Jun Rao <junrao@gmail.com>
pull/13314/head
Kowshik Prakasam 2 years ago committed by GitHub
parent
commit
9f55945270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      core/src/main/scala/kafka/log/UnifiedLog.scala
  2. 30
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  3. 13
      core/src/main/scala/kafka/server/LeaderEndPoint.scala
  4. 13
      core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
  5. 14
      core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
  6. 1
      core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
  7. 1
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  8. 15
      core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
  9. 8
      core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
  10. 9
      core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
  11. 21
      core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
  12. 14
      core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
  13. 30
      core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
  14. 3
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  15. 3
      core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
  16. 7
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
  17. 55
      server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java

6
core/src/main/scala/kafka/log/UnifiedLog.scala

@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName @@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest @@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
@ -917,7 +917,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -917,7 +917,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (foundOffset == UNDEFINED_EPOCH_OFFSET)
None
else
Some(OffsetAndEpoch(foundOffset, foundEpoch))
Some(new OffsetAndEpoch(foundOffset, foundEpoch))
}
}

30
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -32,6 +32,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.LogAppendInfo
@ -613,7 +614,9 @@ abstract class AbstractFetcherThread(name: String, @@ -613,7 +614,9 @@ abstract class AbstractFetcherThread(name: String,
// get (leader epoch, end offset) pair that corresponds to the largest leader epoch
// less than or equal to the requested epoch.
endOffsetForEpoch(tp, leaderEpochOffset.leaderEpoch) match {
case Some(OffsetAndEpoch(followerEndOffset, followerEpoch)) =>
case Some(offsetAndEpoch) =>
val followerEndOffset = offsetAndEpoch.offset
val followerEpoch = offsetAndEpoch.leaderEpoch
if (followerEpoch != leaderEpochOffset.leaderEpoch) {
// the follower does not know about the epoch that leader replied with
// we truncate to the end offset of the largest epoch that is smaller than the
@ -658,7 +661,7 @@ abstract class AbstractFetcherThread(name: String, @@ -658,7 +661,7 @@ abstract class AbstractFetcherThread(name: String,
private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition,
topicId: Option[Uuid],
currentLeaderEpoch: Int,
truncateAndBuild: => (Int, Long) => Long,
truncateAndBuild: => OffsetAndEpoch => Long,
fetchFromLocalLogStartOffset: Boolean = true): PartitionFetchState = {
val replicaEndOffset = logEndOffset(topicPartition)
@ -672,7 +675,8 @@ abstract class AbstractFetcherThread(name: String, @@ -672,7 +675,8 @@ abstract class AbstractFetcherThread(name: String,
*
* There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
*/
val (_, leaderEndOffset) = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
val leaderEndOffset = offsetAndEpoch.offset
if (leaderEndOffset < replicaEndOffset) {
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
@ -704,10 +708,10 @@ abstract class AbstractFetcherThread(name: String, @@ -704,10 +708,10 @@ abstract class AbstractFetcherThread(name: String,
* Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
* and the current leader's (local-log-start-offset or) log start offset.
*/
val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
val offsetAndEpoch = if (fetchFromLocalLogStartOffset)
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
val leaderStartOffset = offsetAndEpoch.offset
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's start offset $leaderStartOffset")
val offsetToFetch =
@ -715,7 +719,7 @@ abstract class AbstractFetcherThread(name: String, @@ -715,7 +719,7 @@ abstract class AbstractFetcherThread(name: String,
// Only truncate log when current leader's log start offset (local log start offset if >= 3.4 version incaseof
// OffsetMovedToTieredStorage error) is greater than follower's log end offset.
// truncateAndBuild returns offset value from which it needs to start fetching.
truncateAndBuild(epoch, leaderStartOffset)
truncateAndBuild(offsetAndEpoch)
} else {
replicaEndOffset
}
@ -732,7 +736,8 @@ abstract class AbstractFetcherThread(name: String, @@ -732,7 +736,8 @@ abstract class AbstractFetcherThread(name: String,
*/
private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch,
(_, leaderLogStartOffset) => {
offsetAndEpoch => {
val leaderLogStartOffset = offsetAndEpoch.offset
truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
leaderLogStartOffset
},
@ -803,7 +808,10 @@ abstract class AbstractFetcherThread(name: String, @@ -803,7 +808,10 @@ abstract class AbstractFetcherThread(name: String,
leaderLogStartOffset: Long): Boolean = {
try {
val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
(offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
offsetAndEpoch => {
val leaderLocalLogStartOffset = offsetAndEpoch.offset
buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetAndEpoch.leaderEpoch(), leaderLogStartOffset)
})
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
debug(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
@ -1025,9 +1033,3 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { @@ -1025,9 +1033,3 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
override def toString: String = s"TruncationState(offset=$offset, completed=$truncationCompleted)"
}
case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {
override def toString: String = {
s"(offset=$offset, leaderEpoch=$leaderEpoch)"
}
}

13
core/src/main/scala/kafka/server/LeaderEndPoint.scala

@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
import org.apache.kafka.server.common.OffsetAndEpoch
import scala.collection.Map
@ -71,9 +72,9 @@ trait LeaderEndPoint { @@ -71,9 +72,9 @@ trait LeaderEndPoint {
* @param topicPartition The topic partition that we want to fetch from
* @param currentLeaderEpoch An int representing the current leader epoch of the requester
*
* @return A tuple representing the (epoch, earliest_offset) in the leader's topic partition.
* @return An OffsetAndEpoch object representing the earliest offset and epoch in the leader's topic partition.
*/
def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
/**
* Fetches the epoch and log end offset of the given topic partition from the leader.
@ -81,9 +82,9 @@ trait LeaderEndPoint { @@ -81,9 +82,9 @@ trait LeaderEndPoint {
* @param topicPartition The topic partition that we want to fetch from
* @param currentLeaderEpoch An int representing the current leader epoch of the requester
*
* @return A tuple representing the (epoch, latest_offset) in the leader's topic partition.
* @return An OffsetAndEpoch object representing the latest offset and epoch in the leader's topic partition.
*/
def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
/**
* Fetches offset for leader epoch from the leader for each given topic partition
@ -100,9 +101,9 @@ trait LeaderEndPoint { @@ -100,9 +101,9 @@ trait LeaderEndPoint {
* @param topicPartition The topic partition that we want to fetch from
* @param currentLeaderEpoch An int representing the current leader epoch of the requester
*
* @return A tuple representing the (epoch, earliest_local_offset) in the leader's topic partition.
* @return An OffsetAndEpoch object representing the earliest local offset and epoch in the leader's topic partition.
*/
def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch
/**
* Builds a fetch request, given a partition map.

13
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala

@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
import java.util
@ -113,25 +114,25 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, @@ -113,25 +114,25 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
partitionData.toMap
}
override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val logStartOffset = partition.localLogOrException.logStartOffset
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logStartOffset)
(epoch.orElse(0), logStartOffset)
new OffsetAndEpoch(logStartOffset, epoch.orElse(0))
}
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val logEndOffset = partition.localLogOrException.logEndOffset
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logEndOffset)
(epoch.orElse(0), logEndOffset)
new OffsetAndEpoch(logEndOffset, epoch.orElse(0))
}
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val localLogStartOffset = partition.localLogOrException.localLogStartOffset()
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset)
(epoch.orElse(0), localLogStartOffset)
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}
override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {

14
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala

@ -31,7 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo @@ -31,7 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
import scala.jdk.CollectionConverters._
@ -94,19 +94,19 @@ class RemoteLeaderEndPoint(logPrefix: String, @@ -94,19 +94,19 @@ class RemoteLeaderEndPoint(logPrefix: String,
}
}
override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP)
}
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP)
}
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
}
private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): (Int, Long) = {
private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = {
val topic = new ListOffsetsTopic()
.setName(topicPartition.topic)
.setPartitions(Collections.singletonList(
@ -126,9 +126,9 @@ class RemoteLeaderEndPoint(logPrefix: String, @@ -126,9 +126,9 @@ class RemoteLeaderEndPoint(logPrefix: String,
Errors.forCode(responsePartition.errorCode) match {
case Errors.NONE =>
if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
(responsePartition.leaderEpoch, responsePartition.offset)
new OffsetAndEpoch(responsePartition.offset, responsePartition.leaderEpoch)
else
(responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0))
new OffsetAndEpoch(responsePartition.oldStyleOffsets.get(0), responsePartition.leaderEpoch)
case error => throw error.exception
}
}

1
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

@ -20,6 +20,7 @@ package kafka.server @@ -20,6 +20,7 @@ package kafka.server
import kafka.log.LeaderOffsetIncremented
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.LogAppendInfo
import scala.collection.{Map, Set}

1
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -26,6 +26,7 @@ import org.apache.kafka.common.requests._ @@ -26,6 +26,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageException, RemoteStorageManager}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile

15
core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala

@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.{BeforeEach, Test}
@ -83,41 +84,41 @@ class LocalLeaderEndPointTest { @@ -83,41 +84,41 @@ class LocalLeaderEndPointTest {
def testFetchLatestOffset(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals((0, 3L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals((4, 6L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
assertEquals(new OffsetAndEpoch(6L, 4), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
def testFetchEarliestOffset(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ())
assertEquals((4, 3L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
def testFetchEarliestLocalOffset(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals((0, 0L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3)
assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test

8
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala

@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition @@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.Mockito.mock
@ -65,21 +65,21 @@ class RemoteLeaderEndPointTest { @@ -65,21 +65,21 @@ class RemoteLeaderEndPointTest {
def testFetchLatestOffset(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
new ListOffsetsPartitionResponse().setLeaderEpoch(7).setOffset(logEndOffset)))
assertEquals((7, logEndOffset), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
assertEquals(new OffsetAndEpoch(logEndOffset, 7), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
}
@Test
def testFetchEarliestOffset(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
new ListOffsetsPartitionResponse().setLeaderEpoch(5).setOffset(logStartOffset)))
assertEquals((5, logStartOffset), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
assertEquals(new OffsetAndEpoch(logStartOffset, 5), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
}
@Test
def testFetchEarliestLocalOffset(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
new ListOffsetsPartitionResponse().setLeaderEpoch(6).setOffset(localLogStartOffset)))
assertEquals((6, localLogStartOffset), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
assertEquals(new OffsetAndEpoch(localLogStartOffset, 6), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
}
@Test

9
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala

@ -25,6 +25,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd @@ -25,6 +25,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
@ -297,9 +298,9 @@ class AbstractFetcherManagerTest { @@ -297,9 +298,9 @@ class AbstractFetcherManagerTest {
override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = Map.empty
override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = Map.empty
@ -307,7 +308,7 @@ class AbstractFetcherManagerTest { @@ -307,7 +308,7 @@ class AbstractFetcherManagerTest {
override val isTruncationOnFetchSupported: Boolean = false
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
}
private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions)
@ -333,7 +334,7 @@ class AbstractFetcherManagerTest { @@ -333,7 +334,7 @@ class AbstractFetcherManagerTest {
override protected def logEndOffset(topicPartition: TopicPartition): Long = 1
override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(OffsetAndEpoch(1, 0))
override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(new OffsetAndEpoch(1, 0))
override protected val isOffsetForLeaderEpochSupported: Boolean = false

21
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala

@ -30,6 +30,7 @@ import org.apache.kafka.common.record._ @@ -30,6 +30,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata}
@ -704,11 +705,11 @@ class AbstractFetcherThreadTest { @@ -704,11 +705,11 @@ class AbstractFetcherThreadTest {
var fetchedEarliestOffset = false
val fetcher = new MockFetcherThread(new MockLeaderEndPoint {
override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
fetchedEarliestOffset = true
throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
}
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
fetchedEarliestOffset = true
throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
}
@ -780,7 +781,7 @@ class AbstractFetcherThreadTest { @@ -780,7 +781,7 @@ class AbstractFetcherThreadTest {
val partition = new TopicPartition("topic", 0)
val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint {
val tries = new AtomicInteger(0)
override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
if (tries.getAndIncrement() == 0)
throw new UnknownLeaderEpochException("Unexpected leader epoch")
super.fetchLatestOffset(topicPartition, leaderEpoch)
@ -1265,22 +1266,22 @@ class AbstractFetcherThreadTest { @@ -1265,22 +1266,22 @@ class AbstractFetcherThreadTest {
}.toMap
}
override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
(leaderState.leaderEpoch, leaderState.logStartOffset)
new OffsetAndEpoch(leaderState.logStartOffset, leaderState.leaderEpoch)
}
override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
(leaderState.leaderEpoch, leaderState.logEndOffset)
new OffsetAndEpoch(leaderState.logEndOffset, leaderState.leaderEpoch)
}
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
(leaderState.leaderEpoch, leaderState.localLogStartOffset)
new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch)
}
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
@ -1542,7 +1543,7 @@ class AbstractFetcherThreadTest { @@ -1542,7 +1543,7 @@ class AbstractFetcherThreadTest {
if (result.endOffset == UNDEFINED_EPOCH_OFFSET)
None
else
Some(OffsetAndEpoch(result.endOffset, result.leaderEpoch))
Some(new OffsetAndEpoch(result.endOffset, result.leaderEpoch))
}
def verifyLastFetchedEpoch(partition: TopicPartition, expectedEpoch: Option[Int]): Unit = {

14
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala

@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -477,7 +477,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -477,7 +477,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLogT1p0.latestEpoch).thenReturn(Some(leaderEpoch))
when(futureLogT1p0.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
when(partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
.setPartition(partitionT1p0Id)
@ -487,7 +487,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -487,7 +487,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLogT1p1.latestEpoch).thenReturn(Some(leaderEpoch))
when(futureLogT1p1.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
when(partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
.setPartition(partitionT1p1Id)
@ -568,7 +568,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -568,7 +568,7 @@ class ReplicaAlterLogDirsThreadTest {
.setEndOffset(replicaLEO))
// but future replica does not know about this leader epoch, so returns a smaller leader epoch
when(futureLog.endOffsetForEpoch(leaderEpoch - 1)).thenReturn(
Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2)))
Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2)))
// finally, the leader replica knows about the leader epoch and returns end offset
when(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch - 2, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest {
.setLeaderEpoch(leaderEpoch - 2)
.setEndOffset(replicaEpochEndOffset))
when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn(
Some(OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
when(replicaManager.logManager).thenReturn(logManager)
stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
@ -693,7 +693,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -693,7 +693,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLog.logEndOffset).thenReturn(futureReplicaLEO)
when(futureLog.latestEpoch).thenReturn(Some(futureReplicaLeaderEpoch))
when(futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).thenReturn(
Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
Some(new OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
when(replicaManager.localLog(t1p0)).thenReturn(Some(log))
// this will cause fetchEpochsFromLeader return an error with undefined offset
@ -786,7 +786,7 @@ class ReplicaAlterLogDirsThreadTest { @@ -786,7 +786,7 @@ class ReplicaAlterLogDirsThreadTest {
when(futureLog.latestEpoch).thenReturn(Some(leaderEpoch))
when(futureLog.logEndOffset).thenReturn(futureReplicaLEO)
when(futureLog.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch)))
when(replicaManager.logManager).thenReturn(logManager)
stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)

30
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala

@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec @@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest { @@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest {
.thenReturn(Some(leaderEpoch))
.thenReturn(None) // t2p1 doesn't support epochs
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(0, leaderEpoch)))
Some(new OffsetAndEpoch(0, leaderEpoch)))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@ -303,7 +303,7 @@ class ReplicaFetcherThreadTest { @@ -303,7 +303,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(0)
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(0, leaderEpoch)))
Some(new OffsetAndEpoch(0, leaderEpoch)))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@ -367,7 +367,7 @@ class ReplicaFetcherThreadTest { @@ -367,7 +367,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(initialLEO - 1)
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(initialLEO, leaderEpoch)))
Some(new OffsetAndEpoch(initialLEO, leaderEpoch)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@ -488,9 +488,9 @@ class ReplicaFetcherThreadTest { @@ -488,9 +488,9 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(initialLEO - 2)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(4)).thenReturn(
Some(OffsetAndEpoch(120, 3)))
Some(new OffsetAndEpoch(120, 3)))
when(log.endOffsetForEpoch(3)).thenReturn(
Some(OffsetAndEpoch(120, 3)))
Some(new OffsetAndEpoch(120, 3)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@ -571,9 +571,9 @@ class ReplicaFetcherThreadTest { @@ -571,9 +571,9 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(log.highWatermark).thenReturn(115)
when(log.latestEpoch).thenAnswer(_ => latestLogEpoch)
when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
when(log.endOffsetForEpoch(3)).thenReturn(Some(OffsetAndEpoch(129, 2)))
when(log.endOffsetForEpoch(2)).thenReturn(Some(OffsetAndEpoch(119, 1)))
when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4)))
when(log.endOffsetForEpoch(3)).thenReturn(Some(new OffsetAndEpoch(129, 2)))
when(log.endOffsetForEpoch(2)).thenReturn(Some(new OffsetAndEpoch(119, 1)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@ -674,7 +674,7 @@ class ReplicaFetcherThreadTest { @@ -674,7 +674,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(highWatermark)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4)))
when(log.logEndOffset).thenReturn(logEndOffset)
when(replicaManager.metadataCache).thenReturn(metadataCache)
@ -766,9 +766,9 @@ class ReplicaFetcherThreadTest { @@ -766,9 +766,9 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(initialLEO - 2)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(4)).thenReturn(
Some(OffsetAndEpoch(120, 3)))
Some(new OffsetAndEpoch(120, 3)))
when(log.endOffsetForEpoch(3)).thenReturn(
Some(OffsetAndEpoch(120, 3)))
Some(new OffsetAndEpoch(120, 3)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@ -893,7 +893,7 @@ class ReplicaFetcherThreadTest { @@ -893,7 +893,7 @@ class ReplicaFetcherThreadTest {
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
// this is for the last reply with EpochEndOffset(5, 156)
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(initialLeo, leaderEpoch)))
Some(new OffsetAndEpoch(initialLeo, leaderEpoch)))
when(log.logEndOffset).thenReturn(initialLeo)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)
@ -958,7 +958,7 @@ class ReplicaFetcherThreadTest { @@ -958,7 +958,7 @@ class ReplicaFetcherThreadTest {
when(log.highWatermark).thenReturn(0)
when(log.latestEpoch).thenReturn(Some(leaderEpoch))
when(log.endOffsetForEpoch(leaderEpoch)).thenReturn(
Some(OffsetAndEpoch(0, leaderEpoch)))
Some(new OffsetAndEpoch(0, leaderEpoch)))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager)
@ -1016,7 +1016,7 @@ class ReplicaFetcherThreadTest { @@ -1016,7 +1016,7 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(log.highWatermark).thenReturn(initialLEO - 2)
when(log.latestEpoch).thenReturn(Some(5))
when(log.endOffsetForEpoch(5)).thenReturn(Some(OffsetAndEpoch(initialLEO, 5)))
when(log.endOffsetForEpoch(5)).thenReturn(Some(new OffsetAndEpoch(initialLEO, 5)))
when(log.logEndOffset).thenReturn(initialLEO)
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log)

3
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -57,6 +57,7 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPar @@ -57,6 +57,7 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPar
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig}
@ -2059,7 +2060,7 @@ class ReplicaManagerTest { @@ -2059,7 +2060,7 @@ class ReplicaManagerTest {
override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
assertEquals(leaderEpoch, leaderEpochFromLeader)
localLogOffset.map { logOffset =>
Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
Some(new OffsetAndEpoch(logOffset, leaderEpochFromLeader))
}.getOrElse(super.endOffsetForEpoch(leaderEpoch))
}

3
core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala

@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics @@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -53,7 +54,7 @@ class OffsetsForLeaderEpochTest { @@ -53,7 +54,7 @@ class OffsetsForLeaderEpochTest {
@Test
def shouldGetEpochsFromReplica(): Unit = {
//Given
val offsetAndEpoch = OffsetAndEpoch(42L, 5)
val offsetAndEpoch = new OffsetAndEpoch(42L, 5)
val epochRequested: Integer = 5
val request = Seq(newOffsetForLeaderTopic(tp, RecordBatch.NO_PARTITION_LEADER_EPOCH, epochRequested))

7
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

@ -30,7 +30,6 @@ import kafka.server.FailedPartitions; @@ -30,7 +30,6 @@ import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.OffsetAndEpoch;
import kafka.server.OffsetTruncationState;
import kafka.server.QuotaFactory;
import kafka.server.RemoteLeaderEndPoint;
@ -67,6 +66,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest; @@ -67,6 +66,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
@ -99,7 +99,6 @@ import java.util.Properties; @@ -99,7 +99,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
@ -318,8 +317,8 @@ public class ReplicaFetcherThreadBenchmark { @@ -318,8 +317,8 @@ public class ReplicaFetcherThreadBenchmark {
config::interBrokerProtocolVersion
) {
@Override
public Tuple2<Object, Object> fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
return Tuple2.apply(0, 0);
public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
return new OffsetAndEpoch(0L, 0);
}
@Override

55
server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java

@ -0,0 +1,55 @@ @@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
public class OffsetAndEpoch {
private final long offset;
private final int leaderEpoch;
public OffsetAndEpoch(long offset, int leaderEpoch) {
this.offset = offset;
this.leaderEpoch = leaderEpoch;
}
public long offset() {
return offset;
}
public int leaderEpoch() {
return leaderEpoch;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OffsetAndEpoch that = (OffsetAndEpoch) o;
return offset == that.offset && leaderEpoch == that.leaderEpoch;
}
@Override
public int hashCode() {
int result = leaderEpoch;
result = 31 * result + Long.hashCode(offset);
return result;
}
@Override
public String toString() {
return "(offset=" + offset + ", leaderEpoch=" + leaderEpoch + ")";
}
}
Loading…
Cancel
Save