@ -54,6 +54,8 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -54,6 +54,8 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handle ( request : RequestChannel . Request ) {
try {
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling request: %s" . format ( request . requestObj ) )
request . requestId match {
case RequestKeys . ProduceKey => handleProducerRequest ( request )
case RequestKeys . FetchKey => handleFetchRequest ( request )
@ -65,68 +67,14 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -65,68 +67,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case e : Throwable =>
request . requestId match {
case RequestKeys . ProduceKey =>
val apiRequest = request . requestObj . asInstanceOf [ ProducerRequest ]
val producerResponseStatus = apiRequest . data . map {
case ( topicAndPartition , data ) =>
( topicAndPartition , ProducerResponseStatus ( ErrorMapping . codeFor ( e . getClass . asInstanceOf [ Class [ Throwable ] ] ) , - 1L ) )
}
val errorResponse = ProducerResponse ( apiRequest . correlationId , producerResponseStatus )
requestChannel . sendResponse ( new Response ( request , new BoundedByteBufferSend ( errorResponse ) ) )
error ( "error when handling request %s" . format ( apiRequest ) , e )
case RequestKeys . FetchKey =>
val apiRequest = request . requestObj . asInstanceOf [ FetchRequest ]
val fetchResponsePartitionData = apiRequest . requestInfo . map {
case ( topicAndPartition , data ) =>
( topicAndPartition , FetchResponsePartitionData ( ErrorMapping . codeFor ( e . getClass . asInstanceOf [ Class [ Throwable ] ] ) , - 1 , null ) )
}
val errorResponse = FetchResponse ( apiRequest . correlationId , fetchResponsePartitionData )
requestChannel . sendResponse ( new RequestChannel . Response ( request , new FetchResponseSend ( errorResponse ) ) )
error ( "error when handling request %s" . format ( apiRequest ) , e )
case RequestKeys . OffsetsKey =>
val apiRequest = request . requestObj . asInstanceOf [ OffsetRequest ]
val partitionOffsetResponseMap = apiRequest . requestInfo . map {
case ( topicAndPartition , partitionOffsetRequest ) =>
( topicAndPartition , PartitionOffsetsResponse ( ErrorMapping . codeFor ( e . getClass . asInstanceOf [ Class [ Throwable ] ] ) , null ) )
}
val errorResponse = OffsetResponse ( apiRequest . correlationId , partitionOffsetResponseMap )
requestChannel . sendResponse ( new Response ( request , new BoundedByteBufferSend ( errorResponse ) ) )
error ( "error when handling request %s" . format ( apiRequest ) , e )
case RequestKeys . MetadataKey =>
val apiRequest = request . requestObj . asInstanceOf [ TopicMetadataRequest ]
val topicMeatadata = apiRequest . topics . map {
topic => TopicMetadata ( topic , Nil , ErrorMapping . codeFor ( e . getClass . asInstanceOf [ Class [ Throwable ] ] ) )
}
val errorResponse = TopicMetadataResponse ( topicMeatadata , apiRequest . correlationId )
requestChannel . sendResponse ( new Response ( request , new BoundedByteBufferSend ( errorResponse ) ) )
error ( "error when handling request %s" . format ( apiRequest ) , e )
case RequestKeys . LeaderAndIsrKey =>
val apiRequest = request . requestObj . asInstanceOf [ LeaderAndIsrRequest ]
val responseMap = apiRequest . partitionStateInfos . map {
case ( topicAndPartition , partitionAndState ) => ( topicAndPartition , ErrorMapping . codeFor ( e . getClass . asInstanceOf [ Class [ Throwable ] ] ) )
}
val errorResponse = LeaderAndIsrResponse ( apiRequest . correlationId , responseMap )
requestChannel . sendResponse ( new Response ( request , new BoundedByteBufferSend ( errorResponse ) ) )
error ( "error when handling request %s" . format ( apiRequest ) , e )
case RequestKeys . StopReplicaKey =>
val apiRequest = request . requestObj . asInstanceOf [ StopReplicaRequest ]
val responseMap = apiRequest . partitions . map {
case topicAndPartition => ( topicAndPartition , ErrorMapping . codeFor ( e . getClass . asInstanceOf [ Class [ Throwable ] ] ) )
} . toMap
error ( "error when handling request %s" . format ( apiRequest ) , e )
val errorResponse = StopReplicaResponse ( apiRequest . correlationId , responseMap )
requestChannel . sendResponse ( new Response ( request , new BoundedByteBufferSend ( errorResponse ) ) )
}
request . requestObj . handleError ( e , requestChannel , request )
error ( "error when handling request %s" . format ( request . requestObj ) , e )
} finally
request . apiLocalCompleteTimeMs = SystemTime . milliseconds
}
def handleLeaderAndIsrRequest ( request : RequestChannel . Request ) {
val leaderAndIsrRequest = request . requestObj . asInstanceOf [ LeaderAndIsrRequest ]
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling LeaderAndIsrRequest v%d with correlation id %d from client %s: %s"
. format ( leaderAndIsrRequest . versionId , leaderAndIsrRequest . correlationId , leaderAndIsrRequest . clientId , leaderAndIsrRequest . toString ) )
try {
val ( response , error ) = replicaManager . becomeLeaderOrFollower ( leaderAndIsrRequest )
val leaderAndIsrResponse = new LeaderAndIsrResponse ( leaderAndIsrRequest . correlationId , response , error )
@ -141,14 +89,9 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -141,14 +89,9 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleStopReplicaRequest ( request : RequestChannel . Request ) {
val stopReplicaRequest = request . requestObj . asInstanceOf [ StopReplicaRequest ]
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling StopReplicaRequest v%d with correlation id %d from client %s: %s"
. format ( stopReplicaRequest . versionId , stopReplicaRequest . correlationId , stopReplicaRequest . clientId , stopReplicaRequest . toString ) )
val ( response , error ) = replicaManager . stopReplicas ( stopReplicaRequest )
val stopReplicaResponse = new StopReplicaResponse ( stopReplicaRequest . correlationId , response . toMap , error )
requestChannel . sendResponse ( new Response ( request , new BoundedByteBufferSend ( stopReplicaResponse ) ) )
replicaManager . replicaFetcherManager . shutdownIdleFetcherThreads ( )
}
@ -174,10 +117,6 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -174,10 +117,6 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleProducerRequest ( request : RequestChannel . Request ) {
val produceRequest = request . requestObj . asInstanceOf [ ProducerRequest ]
val sTime = SystemTime . milliseconds
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling ProducerRequest v%d with correlation id %d from client %s: %s"
. format ( produceRequest . versionId , produceRequest . correlationId , produceRequest . clientId , produceRequest . toString ) )
val localProduceResults = appendToLocalLog ( produceRequest )
debug ( "Produce to local log in %d ms" . format ( SystemTime . milliseconds - sTime ) )
@ -272,10 +211,6 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -272,10 +211,6 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleFetchRequest ( request : RequestChannel . Request ) {
val fetchRequest = request . requestObj . asInstanceOf [ FetchRequest ]
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling FetchRequest v%d with correlation id %d from client %s: %s"
. format ( fetchRequest . versionId , fetchRequest . correlationId , fetchRequest . clientId , fetchRequest . toString ) )
if ( fetchRequest . isFromFollower ) {
maybeUpdatePartitionHw ( fetchRequest )
// after updating HW , some delayed produce requests may be unblocked
@ -382,10 +317,6 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -382,10 +317,6 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleOffsetRequest ( request : RequestChannel . Request ) {
val offsetRequest = request . requestObj . asInstanceOf [ OffsetRequest ]
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling OffsetRequest v%d with correlation id %d from client %s: %s"
. format ( offsetRequest . versionId , offsetRequest . correlationId , offsetRequest . clientId , offsetRequest . toString ) )
val responseMap = offsetRequest . requestInfo . map ( elem => {
val ( topicAndPartition , partitionOffsetRequestInfo ) = elem
try {
@ -422,10 +353,6 @@ class KafkaApis(val requestChannel: RequestChannel,
@@ -422,10 +353,6 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleTopicMetadataRequest ( request : RequestChannel . Request ) {
val metadataRequest = request . requestObj . asInstanceOf [ TopicMetadataRequest ]
if ( requestLogger . isTraceEnabled )
requestLogger . trace ( "Handling TopicMetadataRequest v%d with correlation id %d from client %s: %s"
. format ( metadataRequest . versionId , metadataRequest . correlationId , metadataRequest . clientId , metadataRequest . toString ) )
val topicsMetadata = new mutable . ArrayBuffer [ TopicMetadata ] ( )
val config = replicaManager . config
val uniqueTopics = {