@ -18,21 +18,23 @@
@@ -18,21 +18,23 @@
package kafka.server
import java.io.IOException
import java.util.concurrent.atomic._
import kafka.admin. { CreateTopicCommand , AdminUtils }
import kafka.api._
import kafka.common._
import kafka.log._
import kafka.message._
import kafka.network._
import kafka.utils. { ZkUtils , Pool , SystemTime , Logging }
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import scala.math._
import kafka.network.RequestChannel.Response
import kafka.utils. { ZkUtils , SystemTime , Logging }
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import kafka.cluster.Replica
/* *
* Logic to handle the various Kafka requests
*/
@ -44,10 +46,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -44,10 +46,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
becomeFollower : ( Replica , LeaderAndISR ) => Short ,
brokerId : Int ) extends Logging {
private val produceRequestPurgatory = new ProducerRequestPurgatory ( brokerId )
private val fetchRequestPurgatory = new FetchRequestPurgatory ( brokerId , requestChannel )
private val metricsGroup = brokerId . toString
private val producerRequestPurgatory = new ProducerRequestPurgatory
private val fetchRequestPurgatory = new FetchRequestPurgatory ( requestChannel )
private val delayedRequestMetrics = new DelayedRequestMetrics
private val requestLogger = Logger . getLogger ( "kafka.request.logger" )
this . logIdent = "KafkaApi on Broker " + brokerId + ", "
this . logIdent = "KafkaApis-%d " . format ( brokerId )
/* *
* Top - level method that handles all requests and multiplexes to the right api
@ -69,7 +74,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -69,7 +74,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def handleLeaderAndISRRequest ( request : RequestChannel . Request ) {
val responseMap = new HashMap [ ( String , Int ) , Short ]
val leaderAndISRRequest = LeaderAndISRRequest . readFrom ( request . request . buffer )
info ( "handling leader and isr request " + leaderAndISRRequest )
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling leader and isr request " + leaderAndISRRequest )
trace ( "Handling leader and isr request " + leaderAndISRRequest )
for ( ( partitionInfo , leaderAndISR ) <- leaderAndISRRequest . leaderAndISRInfos ) {
var errorCode = ErrorMapping . NoError
@ -78,12 +85,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -78,12 +85,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
// If the partition does not exist locally , create it
if ( replicaManager . getPartition ( topic , partition ) == None ) {
trace ( "t he partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica" . format ( topic , partition ) )
trace ( "T he partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica" . format ( topic , partition ) )
val assignedReplicas = ZkUtils . getReplicasForPartition ( kafkaZookeeper . getZookeeperClient , topic , partition )
trace ( "a ssigned replicas list for topic [%s] partition [%d] is [%s]" . format ( topic , partition , assignedReplicas . toString ) )
trace ( "A ssigned replicas list for topic [%s] partition [%d] is [%s]" . format ( topic , partition , assignedReplicas . toString ) )
if ( assignedReplicas . contains ( brokerId ) ) {
val replica = addReplicaCbk ( topic , partition , assignedReplicas . toSet )
info ( "s tarting replica for topic [%s] partition [%d]" . format ( replica . topic , replica . partition . partitionId ) )
info ( "S tarting replica for topic [%s] partition [%d]" . format ( replica . topic , replica . partition . partitionId ) )
}
}
val replica = replicaManager . getReplica ( topic , partition ) . get
@ -91,11 +98,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -91,11 +98,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
val requestedLeaderId = leaderAndISR . leader
// If the broker is requested to be the leader and it 's not current the leader ( the leader id is set and not equal to broker id )
if ( requestedLeaderId == brokerId && ( ! replica . partition . leaderId ( ) . isDefined || replica . partition . leaderId ( ) . get != brokerId ) ) {
info ( "b ecoming the leader for partition [%s, %d] at the leader and isr request %s" . format ( topic , partition , leaderAndISRRequest ) )
info ( "B ecoming the leader for partition [%s, %d] at the leader and isr request %s" . format ( topic , partition , leaderAndISRRequest ) )
errorCode = becomeLeader ( replica , leaderAndISR )
}
else if ( requestedLeaderId != brokerId ) {
info ( "b ecoming the follower for partition [%s, %d] at the leader and isr request %s" . format ( topic , partition , leaderAndISRRequest ) )
info ( "B ecoming the follower for partition [%s, %d] at the leader and isr request %s" . format ( topic , partition , leaderAndISRRequest ) )
errorCode = becomeFollower ( replica , leaderAndISR )
}
@ -105,7 +112,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -105,7 +112,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
if ( leaderAndISRRequest . isInit == LeaderAndISRRequest . IsInit ) {
replicaManager . startHighWaterMarksCheckPointThread
val partitionsToRemove = replicaManager . allPartitions . filter ( p => ! leaderAndISRRequest . leaderAndISRInfos . contains ( p . _1 ) ) . keySet
info ( "i nit flag is set in leaderAndISR request, partitions to remove: %s" . format ( partitionsToRemove ) )
info ( "I nit flag is set in leaderAndISR request, partitions to remove: %s" . format ( partitionsToRemove ) )
partitionsToRemove . foreach ( p => stopReplicaCbk ( p . _1 , p . _2 ) )
}
@ -116,6 +123,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -116,6 +123,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def handleStopReplicaRequest ( request : RequestChannel . Request ) {
val stopReplicaRequest = StopReplicaRequest . readFrom ( request . request . buffer )
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling stop replica request " + stopReplicaRequest )
trace ( "Handling stop replica request " + stopReplicaRequest )
val responseMap = new HashMap [ ( String , Int ) , Short ]
for ( ( topic , partition ) <- stopReplicaRequest . stopReplicaSet ) {
@ -133,12 +144,17 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -133,12 +144,17 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def maybeUnblockDelayedFetchRequests ( topic : String , partitionDatas : Array [ PartitionData ] ) {
var satisfied = new mutable . ArrayBuffer [ DelayedFetch ]
for ( partitionData <- partitionDatas )
satisfied ++= fetchRequestPurgatory . update ( ( topic , partitionData . partition ) , partitionData )
trace ( "produce request to %s unblocked %d DelayedFetchR equests." . format ( topic , satisfied . size ) )
satisfied ++= fetchRequestPurgatory . update ( RequestKey ( topic , partitionData . partition ) , null )
trace ( "Producer request to %s unblocked %d fetch r equests." . format ( topic , satisfied . size ) )
// send any newly unblocked responses
for ( fetchReq <- satisfied ) {
val topicData = readMessageSets ( fetchReq . fetch )
val response = new FetchResponse ( FetchRequest . CurrentVersion , fetchReq . fetch . correlationId , topicData )
val fromFollower = fetchReq . fetch . replicaId != FetchRequest . NonFollowerId
delayedRequestMetrics . recordDelayedFetchSatisfied (
fromFollower , SystemTime . nanoseconds - fetchReq . creationTimeNs , response )
requestChannel . sendResponse ( new RequestChannel . Response ( fetchReq . request , new FetchResponseSend ( response ) ) )
}
}
@ -150,43 +166,45 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -150,43 +166,45 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
val produceRequest = ProducerRequest . readFrom ( request . request . buffer )
val sTime = SystemTime . milliseconds
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "producer request %s" . format ( produceR equest . toString ) )
trace ( "Broker %s received produce request %s" . format ( brokerId , produceR equest . toString ) )
requestLogger . trace ( "Handling producer request " + r equest . toString )
trace ( "Handling producer request " + r equest . toString )
val response = produceToLocalLog ( produceRequest )
debug ( "produce to local log in %d ms" . format ( SystemTime . milliseconds - sTime ) )
debug ( "Produce to local log in %d ms" . format ( SystemTime . milliseconds - sTime ) )
for ( topicData <- produceRequest . data )
maybeUnblockDelayedFetchRequests ( topicData . topic , topicData . partitionDataArray )
if ( produceRequest . requiredAcks == 0 ||
produceRequest . requiredAcks == 1 ||
produceRequest . data . size <= 0 ) {
produceRequest . data . size <= 0 )
requestChannel . sendResponse ( new RequestChannel . Response ( request , new BoundedByteBufferSend ( response ) ) )
for ( topicData <- produceRequest . data )
maybeUnblockDelayedFetchRequests ( topicData . topic , topicData . partitionDataArray )
}
else {
// create a list of ( topic , partition ) pairs to use as keys for this delayed request
val topicPartitionPair s = produceRequest . data . flatMap ( topicData => {
val producerRequestKeys = produceRequest . data . flatMap ( topicData => {
val topic = topicData . topic
topicData . partitionDataArray . map ( partitionData => {
( topic , partitionData . partition )
RequestKey ( topic , partitionData . partition )
} )
} )
val delayedProduce = new DelayedProduce (
topicPartitionPair s, request ,
producerRequestKey s, request ,
response . errors , response . offsets ,
produceRequest , produceRequest . ackTimeoutMs . toLong )
produceRequestPurgatory . watch ( delayedProduce )
producerRequestPurgatory . watch ( delayedProduce )
/*
* Replica fetch requests may have arrived ( and potentially satisfied )
* delayedProduce requests before they even made it to the purgatory .
* delayedProduce requests while they were being added to the purgatory .
* Here , we explicitly check if any of them can be satisfied .
*/
var satisfiedProduceRequests = new mutable . ArrayBuffer [ DelayedProduce ]
topicPartitionPairs . foreach ( topicPartition =>
satisfiedProduceRequests ++=
produceRequestPurgatory . update ( topicPartition , topicPartition ) )
debug ( "%d DelayedProduce requests unblocked after produce to local log." . format ( satisfiedProduceRequests . size ) )
producerRequestKeys . foreach ( key =>
satisfiedProduceRequests ++=
producerRequestPurgatory . update ( key , key ) )
debug ( satisfiedProduceRequests . size +
" producer requests unblocked during produce to local log." )
satisfiedProduceRequests . foreach ( _ . respond ( ) )
}
}
@ -195,10 +213,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -195,10 +213,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
* Helper method for handling a parsed producer request
*/
private def produceToLocalLog ( request : ProducerRequest ) : ProducerResponse = {
trace ( "p roduce [%s] to local log " . format ( request . toString ) )
trace ( "P roduce [%s] to local log " . format ( request . toString ) )
val requestSize = request . topicPartitionCount
val errors = new Array [ Short ] ( requestSize )
val offsets = new Array [ Long ] ( requestSize )
var msgIndex = - 1
for ( topicData <- request . data ) {
for ( partitionData <- topicData . partitionDataArray ) {
@ -212,12 +231,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -212,12 +231,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
replicaManager . recordLeaderLogEndOffset ( topicData . topic , partitionData . partition , log . logEndOffset )
offsets ( msgIndex ) = log . logEndOffset
errors ( msgIndex ) = ErrorMapping . NoError . toShort
trace ( partitionData . messages . sizeInBytes + " bytes written to logs." )
trace ( "%d bytes written to logs, nextAppendOffset = %d"
. format ( partitionData . messages . sizeInBytes , offsets ( msgIndex ) ) )
} catch {
case e =>
BrokerTopicStat . getBrokerTopicStat ( topicData . topic ) . recordFailedProduceRequest
BrokerTopicStat . getBrokerAllTopicStat . recordFailedProduceRequest
error ( "error processing ProducerRequest on " + topicData . topic + ":" + partitionData . partition , e )
error ( "Error processing ProducerRequest on %s:%d" . format ( topicData . topic , partitionData . partition ) , e )
e match {
case _ : IOException =>
fatal ( "Halting due to unrecoverable I/O error while handling producer request: " + e . getMessage , e )
@ -229,8 +249,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -229,8 +249,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
}
}
}
val ret = new ProducerResponse ( request . versionId , request . correlationId , errors , offsets )
ret
new ProducerResponse ( request . versionId , request . correlationId , errors , offsets )
}
/* *
@ -238,7 +257,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -238,7 +257,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
*/
def handleFetchRequest ( request : RequestChannel . Request ) {
val fetchRequest = FetchRequest . readFrom ( request . request . buffer )
trace ( "handling fetch request: " + fetchRequest . toString )
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling fetch request " + fetchRequest . toString )
trace ( "Handling fetch request " + fetchRequest . toString )
// validate the request
try {
fetchRequest . validate ( )
@ -255,12 +277,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -255,12 +277,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
var satisfiedProduceRequests = new mutable . ArrayBuffer [ DelayedProduce ]
fetchRequest . offsetInfo . foreach ( topicOffsetInfo => {
topicOffsetInfo . partitions . foreach ( partition => {
satisfiedProduceRequests ++= produceRequestPurgatory . update (
( topicOffsetInfo . topic , partition ) , ( topicOffsetInfo . topic , partition )
)
val key = RequestKey ( topicOffsetInfo . topic , partition )
satisfiedProduceRequests ++= producerRequestPurgatory . update ( key , key )
} )
} )
debug ( "replica %d fetch unblocked %d DelayedProduce requests." . format ( fetchRequest . replicaId , satisfiedProduceRequests . size ) )
debug ( "Replica %d fetch unblocked %d producer requests."
. format ( fetchRequest . replicaId , satisfiedProduceRequests . size ) )
satisfiedProduceRequests . foreach ( _ . respond ( ) )
}
@ -270,14 +292,15 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -270,14 +292,15 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
availableBytes >= fetchRequest . minBytes ||
fetchRequest . numPartitions <= 0 ) {
val topicData = readMessageSets ( fetchRequest )
debug ( "returning fetch response %s for fetch request with correlation id %d" . format ( topicData . map ( _ . partitionDataArray . map ( _ . error ) . mkString ( "," ) ) . mkString ( "," ) , fetchRequest . correlationId ) )
debug ( "Returning fetch response %s for fetch request with correlation id %d" . format (
topicData . map ( _ . partitionDataArray . map ( _ . error ) . mkString ( "," ) ) . mkString ( "," ) , fetchRequest . correlationId ) )
val response = new FetchResponse ( FetchRequest . CurrentVersion , fetchRequest . correlationId , topicData )
requestChannel . sendResponse ( new RequestChannel . Response ( request , new FetchResponseSend ( response ) ) )
} else {
debug ( "p utting fetch request into purgatory" )
debug ( "P utting fetch request into purgatory" )
// create a list of ( topic , partition ) pairs to use as keys for this delayed request
val topicPartitionPairs : Seq [ Any ] = fetchRequest . offsetInfo . flatMap ( o => o . partitions . map ( ( o . topic , _ ) ) )
val delayedFetch = new DelayedFetch ( topicPartitionPair s, request , fetchRequest , fetchRequest . maxWait , availableBytes )
val delayedFetchKeys = fetchRequest . offsetInfo . flatMap ( o => o . partitions . map ( RequestKey ( o . topic , _ ) ) )
val delayedFetch = new DelayedFetch ( delayedFetchKey s, request , fetchRequest , fetchRequest . maxWait )
fetchRequestPurgatory . watch ( delayedFetch )
}
}
@ -298,16 +321,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -298,16 +321,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
totalBytes += math . min ( offsetDetail . fetchSizes ( i ) , available )
} catch {
case e : InvalidPartitionException =>
info ( "invalid partition " + offsetDetail . partitions ( i ) + "in fetch request from client '" + fetchRequest . clientId + "'" )
info ( "Invalid partition %d in fetch request from client %d."
. format ( offsetDetail . partitions ( i ) , fetchRequest . clientId ) )
}
}
}
trace ( totalBytes + " available bytes for fetch request." )
totalBytes
}
private def maybeUpdatePartitionHW ( fetchRequest : FetchRequest ) {
val offsets = fetchRequest . offsetInfo
debug ( "a ct on update partition HW, check offset detail: %s " . format ( offsets ) )
debug ( "A ct on update partition HW, check offset detail: %s " . format ( offsets ) )
for ( offsetDetail <- offsets ) {
val topic = offsetDetail . topic
val ( partitions , offsets ) = ( offsetDetail . partitions , offsetDetail . offsets )
@ -343,17 +368,20 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -343,17 +368,20 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
BrokerTopicStat . getBrokerTopicStat ( topic ) . recordBytesOut ( messages . sizeInBytes )
BrokerTopicStat . getBrokerAllTopicStat . recordBytesOut ( messages . sizeInBytes )
val leaderReplicaOpt = replicaManager . getReplica ( topic , partition , brokerId )
assert ( leaderReplicaOpt . isDefined , "Leader replica for topic %s partition %d" . format ( topic , partition ) + " must exist on leader broker %d" . format ( brokerId ) )
assert ( leaderReplicaOpt . isDefined , "Leader replica for topic %s partition %d must exist on leader broker %d " . format ( topic , partition , brokerId ) )
val leaderReplica = leaderReplicaOpt . get
fetchRequest . replicaId match {
case FetchRequest . NonFollowerId => // replica id value of - 1 signifies a fetch request from an external client , not from one of the replicas
case FetchRequest . NonFollowerId =>
// replica id value of - 1 signifies a fetch request from an external client , not from one of the replicas
new PartitionData ( partition , ErrorMapping . NoError , offset , leaderReplica . highWatermark ( ) , messages )
case _ => // fetch request from a follower
val replicaOpt = replicaManager . getReplica ( topic , partition , fetchRequest . replicaId )
assert ( replicaOpt . isDefined , "No replica %d in replica manager on %d" . format ( fetchRequest . replicaId , brokerId ) )
val replica = replicaOpt . get
debug ( "leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]" . format ( brokerId , replica . topic , replica . partition . partitionId , fetchRequest . replicaId ) )
debug ( "Leader %d returning %d messages for topic %s partition %d to follower %d" . format ( brokerId , messages . sizeInBytes , replica . topic , replica . partition . partitionId , fetchRequest . replicaId ) )
debug ( "Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
. format ( replica . topic , replica . partition . partitionId , fetchRequest . replicaId ) )
debug ( "Leader returning %d messages for topic %s partition %d to follower %d"
. format ( messages . sizeInBytes , replica . topic , replica . partition . partitionId , fetchRequest . replicaId ) )
new PartitionData ( partition , ErrorMapping . NoError , offset , leaderReplica . highWatermark ( ) , messages )
}
}
@ -372,7 +400,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -372,7 +400,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
try {
// check if the current broker is the leader for the partitions
kafkaZookeeper . ensurePartitionLeaderOnThisBroker ( topic , partition )
trace ( "f etching log segment for topic, partition, offset, size = " + ( topic , partition , offset , maxSize ) )
trace ( "F etching log segment for topic, partition, offset, size = " + ( topic , partition , offset , maxSize ) )
val log = logManager . getLog ( topic , partition )
response = Right ( log match { case Some ( l ) => l . read ( offset , maxSize ) case None => MessageSet . Empty } )
} catch {
@ -389,7 +417,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -389,7 +417,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def handleOffsetRequest ( request : RequestChannel . Request ) {
val offsetRequest = OffsetRequest . readFrom ( request . request . buffer )
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "offset request " + offsetRequest . toString )
requestLogger . trace ( "Handling offset request " + offsetRequest . toString )
trace ( "Handling offset request " + offsetRequest . toString )
var response : OffsetResponse = null
try {
kafkaZookeeper . ensurePartitionLeaderOnThisBroker ( offsetRequest . topic , offsetRequest . partition )
@ -412,11 +442,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -412,11 +442,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def handleTopicMetadataRequest ( request : RequestChannel . Request ) {
val metadataRequest = TopicMetadataRequest . readFrom ( request . request . buffer )
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "topic metadata request " + metadataRequest . toString ( ) )
requestLogger . trace ( "Handling topic metadata request " + metadataRequest . toString ( ) )
trace ( "Handling topic metadata request " + metadataRequest . toString ( ) )
val topicsMetadata = new mutable . ArrayBuffer [ TopicMetadata ] ( )
val zkClient = kafkaZookeeper . getZookeeperClient
var errorCode = ErrorMapping . NoError
val config = logManager . config
try {
val topicMetadataList = AdminUtils . getTopicMetaDataFromZK ( metadataRequest . topics , zkClient )
metadataRequest . topics . zip ( topicMetadataList ) . foreach (
@ -452,33 +485,43 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -452,33 +485,43 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
}
def close ( ) {
debug ( "shut down " )
debug ( "Shutting down. " )
fetchRequestPurgatory . shutdown ( )
produceRequestPurgatory . shutdown ( )
debug ( "shutted down completely" )
producerRequestPurgatory . shutdown ( )
debug ( "Shut down complete." )
}
private [ kafka ] trait MetricKey {
def keyLabel : String
}
private [ kafka ] object MetricKey {
val globalLabel = "all"
}
private [ kafka ] case class RequestKey ( topic : String , partition : Int )
extends MetricKey {
override def keyLabel = "%s-%d" . format ( topic , partition )
}
/* *
* A delayed fetch request
*/
class DelayedFetch ( keys : Seq [ Any ] , request : RequestChannel . Request , val fetch : FetchRequest , delayMs : Long , initialSize : Long ) extends DelayedRequest ( keys , request , delayMs ) {
val bytesAccumulated = new AtomicLong ( initialSize )
}
class DelayedFetch ( keys : Seq [ RequestKey ] , request : RequestChannel . Request , val fetch : FetchRequest , delayMs : Long )
extends DelayedRequest ( keys , request , delayMs )
/* *
* A holding pen for fetch requests waiting to be satisfied
*/
class FetchRequestPurgatory ( brokerId : Int , requestChannel : RequestChannel ) extends RequestPurgatory [ DelayedFetch , PartitionData ] ( "Fetch Request Purgatory on Broker " + brokerId + ", " ) {
class FetchRequestPurgatory ( requestChannel : RequestChannel ) extends RequestPurgatory [ DelayedFetch , Null ] ( brokerId ) {
this . logIdent = "FetchRequestPurgatory-%d " . format ( brokerId )
override def metricsGroupIdent = metricsGroup
/* *
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/
def checkSatisfied ( partitionData : PartitionData , delayedFetch : DelayedFetch ) : Boolean = {
val messageDataSize = partitionData . messages . sizeInBytes
val accumulatedSize = delayedFetch . bytesAccumulated . addAndGet ( messageDataSize )
debug ( "fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch . fetch . minBytes )
accumulatedSize >= delayedFetch . fetch . minBytes
}
def checkSatisfied ( n : Null , delayedFetch : DelayedFetch ) : Boolean =
availableFetchBytes ( delayedFetch . fetch ) >= delayedFetch . fetch . minBytes
/* *
* When a request expires just answer it with whatever data is present
@ -486,11 +529,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -486,11 +529,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def expire ( delayed : DelayedFetch ) {
val topicData = readMessageSets ( delayed . fetch )
val response = new FetchResponse ( FetchRequest . CurrentVersion , delayed . fetch . correlationId , topicData )
val fromFollower = delayed . fetch . replicaId != FetchRequest . NonFollowerId
delayedRequestMetrics . recordDelayedFetchExpired ( fromFollower , response )
requestChannel . sendResponse ( new RequestChannel . Response ( delayed . request , new FetchResponseSend ( response ) ) )
}
}
class DelayedProduce ( keys : Seq [ An y] ,
class DelayedProduce ( keys : Seq [ RequestKe y] ,
request : RequestChannel . Request ,
localErrors : Array [ Short ] ,
requiredOffsets : Array [ Long ] ,
@ -504,7 +549,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -504,7 +549,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
* values are effectively synchronized by the ProducerRequestPurgatory 's
* update method
*/
private val partitionStatus = keys . map ( key => {
private [ kafka ] val partitionStatus = keys . map ( key => {
val keyIndex = keys . indexOf ( key )
// if there was an error in writing to the local replica 's log , then don 't
// wait for acks on this partition
@ -525,13 +570,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -525,13 +570,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def respond ( ) {
val errorsAndOffsets : ( List [ Short ] , List [ Long ] ) = (
keys . foldRight
( ( List [ Short ] ( ) , List [ Long ] ( ) ) )
( ( key : An y, result : ( List [ Short ] , List [ Long ] ) ) => {
val status = partitionStatus ( key )
( status . error : : result . _ 1 , status . requiredOffset : : result . _ 2 )
} )
)
keys . foldRight
( ( List [ Short ] ( ) , List [ Long ] ( ) ) )
( ( key : RequestKe y, result : ( List [ Short ] , List [ Long ] ) ) => {
val status = partitionStatus ( key )
( status . error : : result . _ 1 , status . requiredOffset : : result . _ 2 )
} )
)
val response = new ProducerResponse ( produce . versionId , produce . correlationId ,
errorsAndOffsets . _1 . toArray , errorsAndOffsets . _2 . toArray )
@ -550,9 +595,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -550,9 +595,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
* As partitions become acknowledged , we may be able to unblock
* DelayedFetchRequests that are pending on those partitions .
*/
def isSatisfied ( followerFetchPartition : ( String , Int ) ) = {
val ( topic , partitionId ) = followerFetchPartition
val fetchPartitionStatus = partitionStatus ( followerFetchPartition )
def isSatisfied ( followerFetchRequestKey : RequestKey ) = {
val topic = followerFetchRequestKey . topic
val partitionId = followerFetchRequestKey . partition
val key = RequestKey ( topic , partitionId )
val fetchPartitionStatus = partitionStatus ( key )
val durationNs = SystemTime . nanoseconds - creationTimeNs
trace ( "Checking producer request satisfaction for %s-%d, acksPending = %b"
. format ( topic , partitionId , fetchPartitionStatus . acksPending ) )
if ( fetchPartitionStatus . acksPending ) {
val leaderReplica = replicaManager . getLeaderReplica ( topic , partitionId )
leaderReplica match {
@ -560,14 +610,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -560,14 +610,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
if ( leader . isLocal ) {
val isr = leader . partition . inSyncReplicas
val numAcks = isr . count ( r => {
if ( ! r . isLocal )
r . logEndOffset ( ) >= partitionStatus ( followerFetchPartition ) . requiredOffset
if ( ! r . isLocal ) {
r . logEndOffset ( ) >= partitionStatus ( key ) . requiredOffset
}
else
true /* also count the local (leader) replica */
} )
trace ( "Received %d/%d acks for produce request to %s-%d" . format (
trace ( "Received %d/%d acks for producer request to %s-%d; isr size = %d" . format (
numAcks , produce . requiredAcks ,
topic , partitionId ) )
topic , partitionId , isr . size ) )
if ( ( produce . requiredAcks < 0 && numAcks >= isr . size ) ||
( produce . requiredAcks > 0 && numAcks >= produce . requiredAcks ) ) {
/*
@ -575,12 +627,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -575,12 +627,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
* are fully caught up to the ( local ) leader 's offset
* corresponding to this produce request .
*/
fetchPartitionStatus . acksPending = false
fetchPartitionStatus . error = ErrorMapping . NoError
val topicData =
produce . data . find ( _ . topic == topic ) . get
val partitionData =
topicData . partitionDataArray . find ( _ . partition == partitionId ) . get
delayedRequestMetrics . recordDelayedProducerKeyCaughtUp ( key ,
durationNs ,
partitionData . sizeInBytes )
maybeUnblockDelayedFetchRequests (
topic , Array ( partitionData ) )
}
@ -597,7 +653,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -597,7 +653,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
}
// unblocked if there are no partitions with pending acks
! partitionStatus . exists ( p => p . _2 . acksPending )
val satisfied = ! partitionStatus . exists ( p => p . _2 . acksPending )
if ( satisfied )
delayedRequestMetrics . recordDelayedProduceSatisfied ( durationNs )
satisfied
}
class PartitionStatus ( var acksPending : Boolean ,
@ -618,18 +677,159 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
@@ -618,18 +677,159 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
/* *
* A holding pen for produce requests waiting to be satisfied .
*/
private [ kafka ] class ProducerRequestPurgatory ( brokerId : Int ) extends RequestPurgatory [ DelayedProduce , ( String , Int ) ] ( "Producer Request Purgatory on Broker " + brokerId + ", " ) {
private [ kafka ] class ProducerRequestPurgatory extends RequestPurgatory [ DelayedProduce , RequestKey ] ( brokerId ) {
protected def checkSatisfied ( fetchRequestPartition : ( String , Int ) ,
this . logIdent = "ProducerRequestPurgatory-%d " . format ( brokerId )
override def metricsGroupIdent = metricsGroup
protected def checkSatisfied ( followerFetchRequestKey : RequestKey ,
delayedProduce : DelayedProduce ) =
delayedProduce . isSatisfied ( fetchRequestPartition )
delayedProduce . isSatisfied ( followerFetchRequestKey )
/* *
* Handle an expired delayed request
*/
protected def expire ( delayedProduce : DelayedProduce ) {
for ( partitionStatus <- delayedProduce . partitionStatus if partitionStatus . _2 . acksPending )
delayedRequestMetrics . recordDelayedProducerKeyExpired ( partitionStatus . _1 )
delayedProduce . respond ( )
}
}
private class DelayedRequestMetrics {
private class DelayedProducerRequestMetrics ( keyLabel : String = MetricKey . globalLabel ) extends KafkaMetricsGroup {
override def metricsGroupIdent = metricsGroup
val caughtUpFollowerFetchRequestMeter =
newMeter ( "CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel , "requests" , TimeUnit . SECONDS )
val followerCatchUpTimeHistogram = if ( keyLabel == MetricKey . globalLabel )
Some ( newHistogram ( "FollowerCatchUpTimeInNs" , biased = true ) )
else None
/*
* Note that throughput is updated on individual key satisfaction .
* Therefore , it is an upper bound on throughput since the
* DelayedProducerRequest may get expired .
*/
val throughputMeter = newMeter ( "Throughput-" + keyLabel , "bytes" , TimeUnit . SECONDS )
val expiredRequestMeter = newMeter ( "ExpiredRequestsPerSecond-" + keyLabel , "requests" , TimeUnit . SECONDS )
val satisfiedRequestMeter = if ( keyLabel == MetricKey . globalLabel )
Some ( newMeter ( "SatisfiedRequestsPerSecond" , "requests" , TimeUnit . SECONDS ) )
else None
val satisfactionTimeHistogram = if ( keyLabel == MetricKey . globalLabel )
Some ( newHistogram ( "SatisfactionTimeInNs" , biased = true ) )
else None
}
private class DelayedFetchRequestMetrics ( forFollower : Boolean ,
keyLabel : String = MetricKey . globalLabel ) extends KafkaMetricsGroup {
private val metricPrefix = if ( forFollower ) "Follower" else "NonFollower"
override def metricsGroupIdent = metricsGroup
val satisfiedRequestMeter = if ( keyLabel == MetricKey . globalLabel )
Some ( newMeter ( metricPrefix + "-SatisfiedRequestsPerSecond" ,
"requests" , TimeUnit . SECONDS ) )
else None
val satisfactionTimeHistogram = if ( keyLabel == MetricKey . globalLabel )
Some ( newHistogram ( metricPrefix + "-SatisfactionTimeInNs" , biased = true ) )
else None
val expiredRequestMeter = if ( keyLabel == MetricKey . globalLabel )
Some ( newMeter ( metricPrefix + "-ExpiredRequestsPerSecond" ,
"requests" , TimeUnit . SECONDS ) )
else None
val throughputMeter = newMeter ( "%s-Throughput-%s" . format ( metricPrefix , keyLabel ) ,
"bytes" , TimeUnit . SECONDS )
}
private val producerRequestMetricsForKey = {
val valueFactory = ( k : MetricKey ) => new DelayedProducerRequestMetrics ( k . keyLabel )
new Pool [ MetricKey , DelayedProducerRequestMetrics ] ( Some ( valueFactory ) )
}
private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics ( forFollower = true )
private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics ( forFollower = false )
private val followerFetchRequestMetricsForKey = {
val valueFactory = ( k : MetricKey ) => new DelayedFetchRequestMetrics ( forFollower = true , k . keyLabel )
new Pool [ MetricKey , DelayedFetchRequestMetrics ] ( Some ( valueFactory ) )
}
private val nonFollowerFetchRequestMetricsForKey = {
val valueFactory = ( k : MetricKey ) => new DelayedFetchRequestMetrics ( forFollower = false , k . keyLabel )
new Pool [ MetricKey , DelayedFetchRequestMetrics ] ( Some ( valueFactory ) )
}
def recordDelayedProducerKeyExpired ( key : MetricKey ) {
val keyMetrics = producerRequestMetricsForKey . getAndMaybePut ( key )
List ( keyMetrics , aggregateProduceRequestMetrics ) . foreach ( _ . expiredRequestMeter . mark ( ) )
}
def recordDelayedProducerKeyCaughtUp ( key : MetricKey , timeToCatchUpNs : Long , bytes : Int ) {
val keyMetrics = producerRequestMetricsForKey . getAndMaybePut ( key )
List ( keyMetrics , aggregateProduceRequestMetrics ) . foreach ( m => {
m . caughtUpFollowerFetchRequestMeter . mark ( )
m . followerCatchUpTimeHistogram . foreach ( _ . update ( timeToCatchUpNs ) )
m . throughputMeter . mark ( bytes )
} )
}
def recordDelayedProduceSatisfied ( timeToSatisfyNs : Long ) {
aggregateProduceRequestMetrics . satisfiedRequestMeter . foreach ( _ . mark ( ) )
aggregateProduceRequestMetrics . satisfactionTimeHistogram . foreach ( _ . update ( timeToSatisfyNs ) )
}
private def recordDelayedFetchThroughput ( forFollower : Boolean , response : FetchResponse ) {
val metrics = if ( forFollower ) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
metrics . throughputMeter . mark ( response . sizeInBytes )
response . topicMap . foreach ( topicAndData => {
val topic = topicAndData . _1
topicAndData . _2 . partitionDataArray . foreach ( partitionData => {
val key = RequestKey ( topic , partitionData . partition )
val keyMetrics = if ( forFollower )
followerFetchRequestMetricsForKey . getAndMaybePut ( key )
else
nonFollowerFetchRequestMetricsForKey . getAndMaybePut ( key )
keyMetrics . throughputMeter . mark ( partitionData . sizeInBytes )
} )
} )
}
def recordDelayedFetchExpired ( forFollower : Boolean , response : FetchResponse ) {
val metrics = if ( forFollower ) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
metrics . expiredRequestMeter . foreach ( _ . mark ( ) )
recordDelayedFetchThroughput ( forFollower , response )
}
def recordDelayedFetchSatisfied ( forFollower : Boolean , timeToSatisfyNs : Long , response : FetchResponse ) {
val aggregateMetrics = if ( forFollower ) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
aggregateMetrics . satisfactionTimeHistogram . foreach ( _ . update ( timeToSatisfyNs ) )
aggregateMetrics . satisfiedRequestMeter . foreach ( _ . mark ( ) )
recordDelayedFetchThroughput ( forFollower , response )
}
}
}