Browse Source

Expose JMX operation to set logger level dynamically; patched by Jun Rao; reviewed by Jay Kreps; KAFKA-429

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1377172 13f79535-47bb-0310-9956-ffa450edef68
pull/9/head
Jun Rao 12 years ago
parent
commit
749665e336
  1. 5
      core/src/main/scala/kafka/Kafka.scala
  2. 2
      core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  3. 98
      core/src/main/scala/kafka/utils/Log4jController.scala
  4. 5
      core/src/main/scala/kafka/utils/Logging.scala
  5. 4
      core/src/main/scala/kafka/utils/Utils.scala

5
core/src/main/scala/kafka/Kafka.scala

@ -19,14 +19,11 @@ package kafka @@ -19,14 +19,11 @@ package kafka
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import utils.{Utils, Logging}
import org.apache.log4j.jmx.LoggerDynamicMBean
object Kafka extends Logging {
def main(args: Array[String]): Unit = {
val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
import org.apache.log4j.Logger
Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
if (args.length != 1) {
println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))

2
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala

@ -61,7 +61,7 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, @@ -61,7 +61,7 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
sent = true
}
catch {
case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining))
case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining), e)
if (attemptsRemaining == 0)
throw e
}

98
core/src/main/scala/kafka/utils/Log4jController.scala

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import org.apache.log4j.{Logger, Level, LogManager}
import java.util
object Log4jController {
private val controller = new Log4jController
Utils.registerMBean(controller, "kafka:type=kafka.Log4jController")
}
/**
* An MBean that allows the user to dynamically alter log4j levels at runtime.
* The companion object contains the singleton instance of this class and
* registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
* of the companion object.
*/
private class Log4jController extends Log4jControllerMBean {
def getLoggers = {
val lst = new util.ArrayList[String]()
lst.add("root=" + existingLogger("root").getLevel.toString)
val loggers = LogManager.getCurrentLoggers
while (loggers.hasMoreElements) {
val logger = loggers.nextElement().asInstanceOf[Logger]
if (logger != null) {
val level = if (logger != null) logger.getLevel else null
lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null"))
}
}
lst
}
private def newLogger(loggerName: String) =
if (loggerName == "root")
LogManager.getRootLogger
else LogManager.getLogger(loggerName)
private def existingLogger(loggerName: String) =
if (loggerName == "root")
LogManager.getRootLogger
else LogManager.exists(loggerName)
def getLogLevel(loggerName: String) = {
val log = existingLogger(loggerName)
if (log != null) {
val level = log.getLevel
if (level != null)
log.getLevel.toString
else "Null log level."
}
else "No such logger."
}
def setLogLevel(loggerName: String, level: String) = {
val log = newLogger(loggerName)
if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
log.setLevel(Level.toLevel(level.toUpperCase))
true
}
else false
}
}
private trait Log4jControllerMBean {
def getLoggers: java.util.List[String]
def getLogLevel(logger: String): String
def setLogLevel(logger: String, level: String): Boolean
}

5
core/src/main/scala/kafka/utils/Logging.scala

@ -24,7 +24,10 @@ trait Logging { @@ -24,7 +24,10 @@ trait Logging {
lazy val logger = Logger.getLogger(loggerName)
protected var logIdent = ""
// Force initialization to register Log4jControllerMBean
private val log4jController = Log4jController
private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
def trace(msg: => String): Unit = {

4
core/src/main/scala/kafka/utils/Utils.scala

@ -409,7 +409,7 @@ object Utils extends Logging { @@ -409,7 +409,7 @@ object Utils extends Logging {
* instead it just returns false indicating the registration failed.
* @param mbean The object to register as an mbean
* @param name The name to register this mbean with
* @returns true if the registration succeeded
* @return true if the registration succeeded
*/
def registerMBean(mbean: Object, name: String): Boolean = {
try {
@ -445,7 +445,7 @@ object Utils extends Logging { @@ -445,7 +445,7 @@ object Utils extends Logging {
/**
* Read an unsigned integer from the current position in the buffer,
* incrementing the position by 4 bytes
* @param The buffer to read from
* @param buffer The buffer to read from
* @return The integer read, as a long to avoid signedness
*/
def getUnsignedInt(buffer: ByteBuffer): Long =

Loading…
Cancel
Save