diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1d13689e84e..f123a1659f1 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,28 +16,30 @@ */ package kafka.cluster -import kafka.common._ -import kafka.utils._ -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} -import kafka.admin.AdminUtils -import kafka.api.LeaderAndIsr -import kafka.log.LogConfig -import kafka.server._ -import kafka.metrics.KafkaMetricsGroup -import kafka.controller.KafkaController import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock -import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException} -import org.apache.kafka.common.protocol.Errors - -import scala.collection.JavaConverters._ import com.yammer.metrics.core.Gauge +import kafka.admin.AdminUtils +import kafka.api.LeaderAndIsr +import kafka.common.NotAssignedReplicaException +import kafka.controller.KafkaController +import kafka.log.LogConfig +import kafka.metrics.KafkaMetricsGroup +import kafka.server._ +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.PartitionState +import org.apache.kafka.common.requests.EpochEndOffset._ +import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState} import org.apache.kafka.common.utils.Time +import scala.collection.JavaConverters._ + /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ @@ -510,6 +512,21 @@ class Partition(val topic: String, } } + /** + * @param leaderEpoch Requested leader epoch + * @return The last offset of messages published under this leader epoch. + */ + def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = { + inReadLock(leaderIsrUpdateLock) { + leaderReplicaIfLocal match { + case Some(leaderReplica) => + new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch)) + case None => + new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) + } + } + } + private def updateIsr(newIsr: Set[Replica]) { val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId, diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index dd22a263dce..7a47657fac4 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -513,7 +513,7 @@ class Log(@volatile var dir: File, * @throws KafkaStorageException If the append fails due to an I/O error. * @return Information about the appended messages including the first and last offset. */ - private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = { + private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // return if we have no valid messages or if this is a duplicate of the last appended entry diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 33b696a9c7c..c3c37c1a9c0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1656,12 +1656,12 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = { - val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest] - val requestInfo = offsetForEpoch.epochsByTopicPartition() + val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest] + val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition() authorizeClusterAction(request) val responseBody = new OffsetsForLeaderEpochResponse( - replicaManager.getResponseFor(requestInfo) + replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava ) sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody)) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index eec9ab2c9ee..47b6d6927c4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,36 +20,32 @@ import java.io.{File, IOException} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import org.apache.kafka.common.errors._ import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{Partition, Replica} +import kafka.common.KafkaStorageException import kafka.controller.KafkaController import kafka.log.{Log, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ -import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest} -import org.apache.kafka.common.requests._ -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, _} +import org.apache.kafka.common.utils.Time -import scala.collection._ import scala.collection.JavaConverters._ -import java.util.{Map => JMap} - -import kafka.common.KafkaStorageException -import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.protocol.Errors._ -import org.apache.kafka.common.requests.EpochEndOffset._ +import scala.collection._ /* * Result metadata of a log append operation on the log @@ -1108,22 +1104,16 @@ class ReplicaManager(val config: KafkaConfig, new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) } - def getResponseFor(requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = { - OffsetsForLeaderEpoch.getResponseFor(this, requestedEpochInfo) - } -} - -object OffsetsForLeaderEpoch extends Logging { - def getResponseFor(replicaManager: ReplicaManager, requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = { - debug(s"Processing OffsetForEpochRequest: $requestedEpochInfo") - requestedEpochInfo.asScala.map { case (tp, epoch) => - val offset = try { - new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch)) - } catch { - case _: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) - case _: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) + def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, Integer]): Map[TopicPartition, EpochEndOffset] = { + requestedEpochInfo.map { case (tp, leaderEpoch) => + val epochEndOffset = getPartition(tp) match { + case Some(partition) => + partition.lastOffsetForLeaderEpoch(leaderEpoch) + case None => + new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) } - (tp, offset) - }.toMap.asJava + tp -> epochEndOffset + } } } + diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 2b1ecc7d5bc..ffca9007ec1 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -180,7 +180,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM checkpoint.write(epochs) } - def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition" + def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition" def validateAndMaybeWarn(epoch: Int, offset: Long) = { assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}") diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 77b90689195..d004641a1b9 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -16,83 +16,90 @@ */ package kafka.server.epoch -import kafka.server.OffsetsForLeaderEpoch +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.cluster.Replica +import kafka.server._ +import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.EpochEndOffset +import org.apache.kafka.common.requests.EpochEndOffset._ import org.easymock.EasyMock._ -import org.junit.Test import org.junit.Assert._ - -import scala.collection.JavaConverters._ -import scala.collection.mutable +import org.junit.Test class OffsetsForLeaderEpochTest { + private val config = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head + private val time = new MockTime + private val metrics = new Metrics + private val tp = new TopicPartition("topic", 1) @Test def shouldGetEpochsFromReplica(): Unit = { - val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager]) - val replica = createNiceMock(classOf[kafka.cluster.Replica]) - val cache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) - //Given - val tp = new TopicPartition("topic", 1) val offset = 42 val epochRequested: Integer = 5 - val request = mutable.Map(tp -> epochRequested).asJava + val request = Map(tp -> epochRequested) //Stubs - expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) - expect(replica.epochs).andReturn(Some(cache)) - expect(cache.endOffsetFor(epochRequested)).andReturn(offset) - replay(replica, replicaManager, cache) + val mockLog = createNiceMock(classOf[kafka.log.Log]) + val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) + expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset) + expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() + replay(mockCache, mockLog) + + // create a replica manager with 1 partition that has 1 replica + val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) + val partition = replicaManager.getOrCreatePartition(tp) + val leaderReplica = new Replica(config.brokerId, partition, time, 0, Some(mockLog)) + partition.addReplicaIfNotExists(leaderReplica) + partition.leaderReplicaIdOpt = Some(config.brokerId) //When - val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request) + val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.NONE, offset), response.get(tp)) + assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp)) } @Test - def shonuldReturnNoLeaderForPartitionIfThrown(): Unit = { - val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager]) + def shouldReturnNoLeaderForPartitionIfThrown(): Unit = { + //create a replica manager with 1 partition that has 0 replica + val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) + replicaManager.getOrCreatePartition(tp) //Given - val tp = new TopicPartition("topic", 1) val epochRequested: Integer = 5 - val request = mutable.Map(tp -> epochRequested).asJava - - //Stubs - expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new NotLeaderForPartitionException()) - replay(replicaManager) + val request = Map(tp -> epochRequested) //When - val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request) + val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp)) + assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) } @Test def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = { - val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager]) + //create a replica manager with 0 partition + val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) //Given - val tp = new TopicPartition("topic", 1) val epochRequested: Integer = 5 - val request = mutable.Map(tp -> epochRequested).asJava - - //Stubs - expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new UnknownTopicOrPartitionException()) - replay(replicaManager) + val request = Map(tp -> epochRequested) //When - val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request) + val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp)) + assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) } } \ No newline at end of file