diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 1bfabb0cad6..ac749311e1e 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -21,9 +21,10 @@ import java.nio.ByteBuffer import kafka.utils.nonthreadsafe import kafka.api.ApiUtils._ import scala.collection.immutable.Map -import kafka.common.TopicAndPartition +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.ConsumerConfig import java.util.concurrent.atomic.AtomicInteger +import kafka.network.{RequestChannel} case class PartitionFetchInfo(offset: Long, fetchSize: Int) @@ -137,6 +138,28 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId def numPartitions = requestInfo.size + + override def toString(): String = { + val fetchRequest = new StringBuilder + fetchRequest.append("Name: " + this.getClass.getSimpleName) + fetchRequest.append("; Version: " + versionId) + fetchRequest.append("; CorrelationId: " + correlationId) + fetchRequest.append("; ClientId: " + clientId) + fetchRequest.append("; ReplicaId: " + replicaId) + fetchRequest.append("; MaxWait: " + maxWait + " ms") + fetchRequest.append("; MinBytes: " + minBytes + " bytes") + fetchRequest.append("; RequestInfo: " + requestInfo.mkString(",")) + fetchRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val fetchResponsePartitionData = requestInfo.map { + case (topicAndPartition, data) => + (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null)) + } + val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) + } } diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 6955433d90d..616f6790573 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -23,6 +23,9 @@ import kafka.utils._ import kafka.api.ApiUtils._ import kafka.cluster.Broker import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.ErrorMapping +import kafka.network.RequestChannel.Response object LeaderAndIsr { @@ -157,4 +160,25 @@ case class LeaderAndIsrRequest (versionId: Short, size += broker.sizeInBytes /* broker info */ size } + + override def toString(): String = { + val leaderAndIsrRequest = new StringBuilder + leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName) + leaderAndIsrRequest.append("; Version: " + versionId) + leaderAndIsrRequest.append("; CorrelationId: " + correlationId) + leaderAndIsrRequest.append("; ClientId: " + clientId) + leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch) + leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(",")) + leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(",")) + leaderAndIsrRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val responseMap = partitionStateInfos.map { + case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + val errorResponse = LeaderAndIsrResponse(correlationId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 6c522bca2f9..6360a98bd5e 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -18,8 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.TopicAndPartition +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.ApiUtils._ +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.network.RequestChannel.Response object OffsetRequest { @@ -104,4 +106,24 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId + + override def toString(): String = { + val offsetRequest = new StringBuilder + offsetRequest.append("Name: " + this.getClass.getSimpleName) + offsetRequest.append("; Version: " + versionId) + offsetRequest.append("; CorrelationId: " + correlationId) + offsetRequest.append("; ClientId: " + clientId) + offsetRequest.append("; RequestInfo: " + requestInfo.mkString(",")) + offsetRequest.append("; ReplicaId: " + replicaId) + offsetRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val partitionOffsetResponseMap = requestInfo.map { + case (topicAndPartition, partitionOffsetRequest) => + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) + } + val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } } diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index ffa96a6caf7..72b2cbaedcf 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -20,8 +20,10 @@ package kafka.api import java.nio._ import kafka.message._ import scala.collection.Map -import kafka.common.TopicAndPartition import kafka.api.ApiUtils._ +import kafka.common._ +import kafka.network.RequestChannel.Response +import kafka.network.{RequestChannel, BoundedByteBufferSend} object ProducerRequest { val CurrentVersion = 0.shortValue @@ -120,5 +122,25 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, def numPartitions = data.size + override def toString(): String = { + val producerRequest = new StringBuilder + producerRequest.append("Name: " + this.getClass.getSimpleName) + producerRequest.append("; Version: " + versionId) + producerRequest.append("; CorrelationId: " + correlationId) + producerRequest.append("; ClientId: " + clientId) + producerRequest.append("; RequiredAcks: " + requiredAcks) + producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(",")) + producerRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val producerResponseStatus = data.map { + case (topicAndPartition, data) => + (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + } + val errorResponse = ProducerResponse(correlationId, producerResponseStatus) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } } diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 83ad42c5e57..3175e1c1618 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -18,6 +18,8 @@ package kafka.api */ import java.nio._ +import kafka.network.RequestChannel +import kafka.utils.Logging object Request { val OrdinaryConsumerId: Int = -1 @@ -25,10 +27,12 @@ object Request { } -private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) { +private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{ def sizeInBytes: Int def writeTo(buffer: ByteBuffer): Unit - + + def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {} } + diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 9fe849b5957..0580636e521 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -20,8 +20,10 @@ package kafka.api import java.nio._ import kafka.api.ApiUtils._ -import kafka.utils.Logging -import kafka.network.InvalidRequestException +import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException} +import kafka.common.ErrorMapping +import kafka.network.RequestChannel.Response +import kafka.utils.{Logging} object StopReplicaRequest extends Logging { @@ -93,4 +95,25 @@ case class StopReplicaRequest(versionId: Short, } size } + + override def toString(): String = { + val stopReplicaRequest = new StringBuilder + stopReplicaRequest.append("Name: " + this.getClass.getSimpleName) + stopReplicaRequest.append("; Version: " + versionId) + stopReplicaRequest.append("; CorrelationId: " + correlationId) + stopReplicaRequest.append("; ClientId: " + clientId) + stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + stopReplicaRequest.append("; DeletePartitions: " + deletePartitions) + stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch) + stopReplicaRequest.append("; Partitions: " + partitions.mkString(",")) + stopReplicaRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val responseMap = partitions.map { + case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + }.toMap + val errorResponse = StopReplicaResponse(correlationId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } } diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index fe1170fc5cd..824f8f1071c 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -20,7 +20,10 @@ package kafka.api import java.nio.ByteBuffer import kafka.api.ApiUtils._ import collection.mutable.ListBuffer -import kafka.utils.Logging +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.ErrorMapping +import kafka.network.RequestChannel.Response +import kafka.utils.{Logging} object TopicMetadataRequest extends Logging { val CurrentVersion = 0.shortValue @@ -67,4 +70,22 @@ case class TopicMetadataRequest(val versionId: Short, 4 + /* number of topics */ topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ } + + override def toString(): String = { + val topicMetadataRequest = new StringBuilder + topicMetadataRequest.append("Name: " + this.getClass.getSimpleName) + topicMetadataRequest.append("; Version: " + versionId) + topicMetadataRequest.append("; CorrelationId: " + correlationId) + topicMetadataRequest.append("; ClientId: " + clientId) + topicMetadataRequest.append("; Topics: " + topics.mkString(",")) + topicMetadataRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val topicMetadata = topics.map { + topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + val errorResponse = TopicMetadataResponse(topicMetadata, correlationId) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ea5b5a0d089..71eb98096f2 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -337,8 +337,8 @@ class Partition(val topic: String, partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) partitionString.append("; Leader: " + leaderReplicaIdOpt) - partitionString.append("; Assigned replicas: " + assignedReplicaMap.keys.mkString(",")) - partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(",")) + partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(",")) + partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(",")) partitionString.toString() } } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 848c877ad34..5185dec2999 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -46,12 +46,7 @@ object RequestChannel extends Logging { val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer.rewind() - buffer.getShort - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = ApiUtils.readShortString(buffer) - buffer.rewind() - trace("Received request v%d with correlation id %d from client %s: %s".format(versionId, correlationId, clientId, requestObj)) + trace("Received request : %s".format(requestObj)) def updateRequestMetrics() { val endTimeMs = SystemTime.milliseconds @@ -80,8 +75,8 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } - trace("Completed request v%d with correlation id %d and client %s: %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d" - .format(versionId, correlationId, clientId, requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) + trace("Completed request : %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d" + .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 60752fb2fdf..0a1a11a93aa 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -54,6 +54,8 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handle(request: RequestChannel.Request) { try{ + if(requestLogger.isTraceEnabled) + requestLogger.trace("Handling request: %s".format(request.requestObj)) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) @@ -65,68 +67,14 @@ class KafkaApis(val requestChannel: RequestChannel, } } catch { case e: Throwable => - request.requestId match { - case RequestKeys.ProduceKey => - val apiRequest = request.requestObj.asInstanceOf[ProducerRequest] - val producerResponseStatus = apiRequest.data.map { - case (topicAndPartition, data) => - (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L)) - } - val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - error("error when handling request %s".format(apiRequest), e) - case RequestKeys.FetchKey => - val apiRequest = request.requestObj.asInstanceOf[FetchRequest] - val fetchResponsePartitionData = apiRequest.requestInfo.map { - case (topicAndPartition, data) => - (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null)) - } - val errorResponse = FetchResponse(apiRequest.correlationId, fetchResponsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) - error("error when handling request %s".format(apiRequest), e) - case RequestKeys.OffsetsKey => - val apiRequest = request.requestObj.asInstanceOf[OffsetRequest] - val partitionOffsetResponseMap = apiRequest.requestInfo.map { - case (topicAndPartition, partitionOffsetRequest) => - (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) - } - val errorResponse = OffsetResponse(apiRequest.correlationId, partitionOffsetResponseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - error("error when handling request %s".format(apiRequest), e) - case RequestKeys.MetadataKey => - val apiRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicMeatadata = apiRequest.topics.map { - topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } - val errorResponse = TopicMetadataResponse(topicMeatadata, apiRequest.correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - error("error when handling request %s".format(apiRequest), e) - case RequestKeys.LeaderAndIsrKey => - val apiRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] - val responseMap = apiRequest.partitionStateInfos.map { - case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } - val errorResponse = LeaderAndIsrResponse(apiRequest.correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - error("error when handling request %s".format(apiRequest), e) - case RequestKeys.StopReplicaKey => - val apiRequest = request.requestObj.asInstanceOf[StopReplicaRequest] - val responseMap = apiRequest.partitions.map { - case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - }.toMap - error("error when handling request %s".format(apiRequest), e) - val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - } + request.requestObj.handleError(e, requestChannel, request) + error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds } def handleLeaderAndIsrRequest(request: RequestChannel.Request) { val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling LeaderAndIsrRequest v%d with correlation id %d from client %s: %s" - .format(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, leaderAndIsrRequest.clientId, leaderAndIsrRequest.toString)) try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) @@ -141,14 +89,9 @@ class KafkaApis(val requestChannel: RequestChannel, def handleStopReplicaRequest(request: RequestChannel.Request) { val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling StopReplicaRequest v%d with correlation id %d from client %s: %s" - .format(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, stopReplicaRequest.clientId, stopReplicaRequest.toString)) - val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) - replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() } @@ -174,10 +117,6 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] val sTime = SystemTime.milliseconds - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling ProducerRequest v%d with correlation id %d from client %s: %s" - .format(produceRequest.versionId, produceRequest.correlationId, produceRequest.clientId, produceRequest.toString)) - val localProduceResults = appendToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) @@ -272,10 +211,6 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling FetchRequest v%d with correlation id %d from client %s: %s" - .format(fetchRequest.versionId, fetchRequest.correlationId, fetchRequest.clientId, fetchRequest.toString)) - if(fetchRequest.isFromFollower) { maybeUpdatePartitionHw(fetchRequest) // after updating HW, some delayed produce requests may be unblocked @@ -382,10 +317,6 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling OffsetRequest v%d with correlation id %d from client %s: %s" - .format(offsetRequest.versionId, offsetRequest.correlationId, offsetRequest.clientId, offsetRequest.toString)) - val responseMap = offsetRequest.requestInfo.map(elem => { val (topicAndPartition, partitionOffsetRequestInfo) = elem try { @@ -422,10 +353,6 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling TopicMetadataRequest v%d with correlation id %d from client %s: %s" - .format(metadataRequest.versionId, metadataRequest.correlationId, metadataRequest.clientId, metadataRequest.toString)) - val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() val config = replicaManager.config val uniqueTopics = {