From 9eb9e510118d1a69a3ab9ceac2acee6ab08099d6 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Thu, 25 Oct 2012 18:15:41 +0000 Subject: [PATCH] produce/fetch remote time metric not set correctly when num.acks = 1; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-584 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1402250 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/network/RequestChannel.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index c1628e0d891..1f7124d6bd6 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -40,9 +40,9 @@ object RequestChannel extends Logging { } case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) { - var dequeueTimeMs = -1L - var apiLocalCompleteTimeMs = -1L - var responseCompleteTimeMs = -1L + @volatile var dequeueTimeMs = -1L + @volatile var apiLocalCompleteTimeMs = -1L + @volatile var responseCompleteTimeMs = -1L val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer.rewind() @@ -50,6 +50,10 @@ object RequestChannel extends Logging { def updateRequestMetrics() { val endTimeMs = SystemTime.milliseconds + // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote + // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs. + if (apiLocalCompleteTimeMs < 0) + apiLocalCompleteTimeMs = responseCompleteTimeMs val queueTime = (dequeueTimeMs - startTimeMs).max(0L) val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L) val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L) @@ -71,8 +75,9 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } + trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d" + .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) } - trace("Completed request: %s".format(requestObj)) } case class Response(processor: Int, request: Request, responseSend: Send) {