Browse Source

KAFKA-5036; hold onto the leader lock in Partition while serving an O…

…ffsetForLeaderEpoch request

Author: Jun Rao <junrao@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ben Stopford <benstopford@gmail.com>

Closes #3074 from junrao/kafka-5036
pull/3078/merge
Jun Rao 8 years ago
parent
commit
2491520626
  1. 45
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 2
      core/src/main/scala/kafka/log/Log.scala
  3. 6
      core/src/main/scala/kafka/server/KafkaApis.scala
  4. 48
      core/src/main/scala/kafka/server/ReplicaManager.scala
  5. 2
      core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
  6. 83
      core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala

45
core/src/main/scala/kafka/cluster/Partition.scala

@ -16,28 +16,30 @@ @@ -16,28 +16,30 @@
*/
package kafka.cluster
import kafka.common._
import kafka.utils._
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
import kafka.server._
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException}
import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.common.NotAssignedReplicaException
import kafka.controller.KafkaController
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.PartitionState
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState}
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
*/
@ -510,6 +512,21 @@ class Partition(val topic: String, @@ -510,6 +512,21 @@ class Partition(val topic: String,
}
}
/**
* @param leaderEpoch Requested leader epoch
* @return The last offset of messages published under this leader epoch.
*/
def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = {
inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch))
case None =>
new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
}
}
}
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,

2
core/src/main/scala/kafka/log/Log.scala

@ -513,7 +513,7 @@ class Log(@volatile var dir: File, @@ -513,7 +513,7 @@ class Log(@volatile var dir: File,
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
// return if we have no valid messages or if this is a duplicate of the last appended entry

6
core/src/main/scala/kafka/server/KafkaApis.scala

@ -1656,12 +1656,12 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1656,12 +1656,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest]
val requestInfo = offsetForEpoch.epochsByTopicPartition()
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
authorizeClusterAction(request)
val responseBody = new OffsetsForLeaderEpochResponse(
replicaManager.getResponseFor(requestInfo)
replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
)
sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
}

48
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -20,36 +20,32 @@ import java.io.{File, IOException} @@ -20,36 +20,32 @@ import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import org.apache.kafka.common.errors._
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{Partition, Replica}
import kafka.common.KafkaStorageException
import kafka.controller.KafkaController
import kafka.log.{Log, LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, _}
import org.apache.kafka.common.utils.Time
import scala.collection._
import scala.collection.JavaConverters._
import java.util.{Map => JMap}
import kafka.common.KafkaStorageException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.requests.EpochEndOffset._
import scala.collection._
/*
* Result metadata of a log append operation on the log
@ -1108,22 +1104,16 @@ class ReplicaManager(val config: KafkaConfig, @@ -1108,22 +1104,16 @@ class ReplicaManager(val config: KafkaConfig,
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
}
def getResponseFor(requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
OffsetsForLeaderEpoch.getResponseFor(this, requestedEpochInfo)
}
}
object OffsetsForLeaderEpoch extends Logging {
def getResponseFor(replicaManager: ReplicaManager, requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
debug(s"Processing OffsetForEpochRequest: $requestedEpochInfo")
requestedEpochInfo.asScala.map { case (tp, epoch) =>
val offset = try {
new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch))
} catch {
case _: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
case _: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, Integer]): Map[TopicPartition, EpochEndOffset] = {
requestedEpochInfo.map { case (tp, leaderEpoch) =>
val epochEndOffset = getPartition(tp) match {
case Some(partition) =>
partition.lastOffsetForLeaderEpoch(leaderEpoch)
case None =>
new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
}
(tp, offset)
}.toMap.asJava
tp -> epochEndOffset
}
}
}

2
core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala

@ -180,7 +180,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM @@ -180,7 +180,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
checkpoint.write(epochs)
}
def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition"
def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition"
def validateAndMaybeWarn(epoch: Int, offset: Long) = {
assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")

83
core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala

@ -16,83 +16,90 @@ @@ -16,83 +16,90 @@
*/
package kafka.server.epoch
import kafka.server.OffsetsForLeaderEpoch
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Replica
import kafka.server._
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.EpochEndOffset
import org.apache.kafka.common.requests.EpochEndOffset._
import org.easymock.EasyMock._
import org.junit.Test
import org.junit.Assert._
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.junit.Test
class OffsetsForLeaderEpochTest {
private val config = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head
private val time = new MockTime
private val metrics = new Metrics
private val tp = new TopicPartition("topic", 1)
@Test
def shouldGetEpochsFromReplica(): Unit = {
val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
val replica = createNiceMock(classOf[kafka.cluster.Replica])
val cache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
//Given
val tp = new TopicPartition("topic", 1)
val offset = 42
val epochRequested: Integer = 5
val request = mutable.Map(tp -> epochRequested).asJava
val request = Map(tp -> epochRequested)
//Stubs
expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
expect(replica.epochs).andReturn(Some(cache))
expect(cache.endOffsetFor(epochRequested)).andReturn(offset)
replay(replica, replicaManager, cache)
val mockLog = createNiceMock(classOf[kafka.log.Log])
val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset)
expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
replay(mockCache, mockLog)
// create a replica manager with 1 partition that has 1 replica
val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false),
QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(config.brokerId))
val partition = replicaManager.getOrCreatePartition(tp)
val leaderReplica = new Replica(config.brokerId, partition, time, 0, Some(mockLog))
partition.addReplicaIfNotExists(leaderReplica)
partition.leaderReplicaIdOpt = Some(config.brokerId)
//When
val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request)
val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then
assertEquals(new EpochEndOffset(Errors.NONE, offset), response.get(tp))
assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp))
}
@Test
def shonuldReturnNoLeaderForPartitionIfThrown(): Unit = {
val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
//create a replica manager with 1 partition that has 0 replica
val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false),
QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(config.brokerId))
replicaManager.getOrCreatePartition(tp)
//Given
val tp = new TopicPartition("topic", 1)
val epochRequested: Integer = 5
val request = mutable.Map(tp -> epochRequested).asJava
//Stubs
expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new NotLeaderForPartitionException())
replay(replicaManager)
val request = Map(tp -> epochRequested)
//When
val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request)
val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then
assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp))
assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp))
}
@Test
def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
//create a replica manager with 0 partition
val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false),
QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(config.brokerId))
//Given
val tp = new TopicPartition("topic", 1)
val epochRequested: Integer = 5
val request = mutable.Map(tp -> epochRequested).asJava
//Stubs
expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new UnknownTopicOrPartitionException())
replay(replicaManager)
val request = Map(tp -> epochRequested)
//When
val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request)
val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then
assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp))
assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp))
}
}
Loading…
Cancel
Save