Browse Source

KAFKA-1350 Fix excessive state change logging;reviewed by Jun,Joel,Guozhang and Timothy

pull/21/head
Neha Narkhede 11 years ago
parent
commit
dd08538a4f
  1. 2
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 4
      core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  3. 6
      core/src/main/scala/kafka/controller/KafkaController.scala
  4. 2
      core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  5. 2
      core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
  6. 9
      core/src/main/scala/kafka/network/RequestChannel.scala
  7. 14
      core/src/main/scala/kafka/server/KafkaApis.scala
  8. 2
      core/src/main/scala/kafka/server/ReplicaManager.scala

2
core/src/main/scala/kafka/cluster/Partition.scala

@ -56,7 +56,7 @@ class Partition(val topic: String, @@ -56,7 +56,7 @@ class Partition(val topic: String,
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private val stateChangeLogger = KafkaController.stateChangeLogger
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)

4
core/src/main/scala/kafka/controller/ControllerChannelManager.scala

@ -114,7 +114,7 @@ class RequestSendThread(val controllerId: Int, @@ -114,7 +114,7 @@ class RequestSendThread(val controllerId: Int,
val channel: BlockingChannel)
extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) {
private val lock = new Object()
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private val stateChangeLogger = KafkaController.stateChangeLogger
connectToBroker(toBroker, channel)
override def doWork(): Unit = {
@ -188,7 +188,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging @@ -188,7 +188,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private val stateChangeLogger = KafkaController.stateChangeLogger
def newBatch() {
// raise error if the previous batch is not empty

6
core/src/main/scala/kafka/controller/KafkaController.scala

@ -125,10 +125,12 @@ trait KafkaControllerMBean { @@ -125,10 +125,12 @@ trait KafkaControllerMBean {
object KafkaController extends Logging {
val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
val stateChangeLogger = "state.change.logger"
val stateChangeLogger = new StateChangeLogger("state.change.logger")
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
case class StateChangeLogger(override val loggerName: String) extends Logging
def parseControllerId(controllerInfoString: String): Int = {
try {
Json.parseFull(controllerInfoString) match {
@ -154,7 +156,7 @@ object KafkaController extends Logging { @@ -154,7 +156,7 @@ object KafkaController extends Logging {
class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)

2
core/src/main/scala/kafka/controller/PartitionStateMachine.scala

@ -50,7 +50,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -50,7 +50,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private val stateChangeLogger = KafkaController.stateChangeLogger
private var topicChangeListener: TopicChangeListener = null
private var deleteTopicsListener: DeleteTopicsListener = null
private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty

2
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

@ -52,7 +52,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -52,7 +52,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private val stateChangeLogger = KafkaController.stateChangeLogger
/**
* Invoked on successful controller election. First registers a broker change listener since that triggers all

9
core/src/main/scala/kafka/network/RequestChannel.scala

@ -31,6 +31,9 @@ import org.apache.log4j.Logger @@ -31,6 +31,9 @@ import org.apache.log4j.Logger
object RequestChannel extends Logging {
val AllDone = new Request(1, 2, getShutdownReceive(), 0)
val requestLogger = new RequestLogger("kafka.request.logger")
case class RequestLogger(override val loggerName: String) extends Logging
def getShutdownReceive() = {
val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
@ -49,7 +52,7 @@ object RequestChannel extends Logging { @@ -49,7 +52,7 @@ object RequestChannel extends Logging {
val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
buffer = null
private val requestLogger = Logger.getLogger("kafka.request.logger")
private val requestLogger = RequestChannel.requestLogger
trace("Processor %d received request : %s".format(processor, requestObj))
def updateRequestMetrics() {
@ -81,10 +84,10 @@ object RequestChannel extends Logging { @@ -81,10 +84,10 @@ object RequestChannel extends Logging {
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
}
if(requestLogger.isTraceEnabled)
if(requestLogger.logger.isTraceEnabled)
requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
.format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
else if(requestLogger.isDebugEnabled) {
else {
requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
.format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
}

14
core/src/main/scala/kafka/server/KafkaApis.scala

@ -138,10 +138,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -138,10 +138,9 @@ class KafkaApis(val requestChannel: RequestChannel,
updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
metadataCache.put(partitionState._1, partitionState._2)
if(stateChangeLogger.isTraceEnabled)
stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
}
// remove the topics that don't exist in the UpdateMetadata request since those are the topics that are
// currently being deleted by the controller
@ -155,10 +154,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -155,10 +154,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}.keySet
partitionsToBeDeleted.foreach { partition =>
metadataCache.remove(partition)
if(stateChangeLogger.isTraceEnabled)
stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
}
}
val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)

2
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -56,7 +56,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -56,7 +56,7 @@ class ReplicaManager(val config: KafkaConfig,
val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
val stateChangeLogger = KafkaController.stateChangeLogger
newGauge(
"LeaderCount",

Loading…
Cancel
Save