Browse Source

produce/fetch remote time metric not set correctly when num.acks = 1; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-584

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1402250 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
9eb9e51011
  1. 13
      core/src/main/scala/kafka/network/RequestChannel.scala

13
core/src/main/scala/kafka/network/RequestChannel.scala

@ -40,9 +40,9 @@ object RequestChannel extends Logging { @@ -40,9 +40,9 @@ object RequestChannel extends Logging {
}
case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) {
var dequeueTimeMs = -1L
var apiLocalCompleteTimeMs = -1L
var responseCompleteTimeMs = -1L
@volatile var dequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
buffer.rewind()
@ -50,6 +50,10 @@ object RequestChannel extends Logging { @@ -50,6 +50,10 @@ object RequestChannel extends Logging {
def updateRequestMetrics() {
val endTimeMs = SystemTime.milliseconds
// In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote
// processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
if (apiLocalCompleteTimeMs < 0)
apiLocalCompleteTimeMs = responseCompleteTimeMs
val queueTime = (dequeueTimeMs - startTimeMs).max(0L)
val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L)
val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
@ -71,8 +75,9 @@ object RequestChannel extends Logging { @@ -71,8 +75,9 @@ object RequestChannel extends Logging {
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
}
trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d"
.format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
}
trace("Completed request: %s".format(requestObj))
}
case class Response(processor: Int, request: Request, responseSend: Send) {

Loading…
Cancel
Save