diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 3ac2e7bdad3..367cf516fc3 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -19,14 +19,11 @@ package kafka import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import utils.{Utils, Logging} -import org.apache.log4j.jmx.LoggerDynamicMBean + object Kafka extends Logging { def main(args: Array[String]): Unit = { - val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j" - import org.apache.log4j.Logger - Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName) if (args.length != 1) { println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName())) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index f72eed18147..8d6664fd32d 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -61,7 +61,7 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, sent = true } catch { - case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining)) + case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining), e) if (attemptsRemaining == 0) throw e } diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala new file mode 100644 index 00000000000..a015c8109f0 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Log4jController.scala @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + + +import org.apache.log4j.{Logger, Level, LogManager} +import java.util + + +object Log4jController { + + private val controller = new Log4jController + + Utils.registerMBean(controller, "kafka:type=kafka.Log4jController") + +} + + +/** + * An MBean that allows the user to dynamically alter log4j levels at runtime. + * The companion object contains the singleton instance of this class and + * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization + * of the companion object. + */ +private class Log4jController extends Log4jControllerMBean { + + def getLoggers = { + val lst = new util.ArrayList[String]() + lst.add("root=" + existingLogger("root").getLevel.toString) + val loggers = LogManager.getCurrentLoggers + while (loggers.hasMoreElements) { + val logger = loggers.nextElement().asInstanceOf[Logger] + if (logger != null) { + val level = if (logger != null) logger.getLevel else null + lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null")) + } + } + lst + } + + + private def newLogger(loggerName: String) = + if (loggerName == "root") + LogManager.getRootLogger + else LogManager.getLogger(loggerName) + + + private def existingLogger(loggerName: String) = + if (loggerName == "root") + LogManager.getRootLogger + else LogManager.exists(loggerName) + + + def getLogLevel(loggerName: String) = { + val log = existingLogger(loggerName) + if (log != null) { + val level = log.getLevel + if (level != null) + log.getLevel.toString + else "Null log level." + } + else "No such logger." + } + + + def setLogLevel(loggerName: String, level: String) = { + val log = newLogger(loggerName) + if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) { + log.setLevel(Level.toLevel(level.toUpperCase)) + true + } + else false + } + +} + + +private trait Log4jControllerMBean { + def getLoggers: java.util.List[String] + def getLogLevel(logger: String): String + def setLogLevel(logger: String, level: String): Boolean +} + diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index 6e05eb46168..3f69e547fef 100644 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -24,7 +24,10 @@ trait Logging { lazy val logger = Logger.getLogger(loggerName) protected var logIdent = "" - + + // Force initialization to register Log4jControllerMBean + private val log4jController = Log4jController + private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg) def trace(msg: => String): Unit = { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 2c7dc9c3403..63076219dfe 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -409,7 +409,7 @@ object Utils extends Logging { * instead it just returns false indicating the registration failed. * @param mbean The object to register as an mbean * @param name The name to register this mbean with - * @returns true if the registration succeeded + * @return true if the registration succeeded */ def registerMBean(mbean: Object, name: String): Boolean = { try { @@ -445,7 +445,7 @@ object Utils extends Logging { /** * Read an unsigned integer from the current position in the buffer, * incrementing the position by 4 bytes - * @param The buffer to read from + * @param buffer The buffer to read from * @return The integer read, as a long to avoid signedness */ def getUnsignedInt(buffer: ByteBuffer): Long =