diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0d362485a69..545069972fb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -255,6 +255,7 @@ private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val max } } + /** * Read from the log file at the given offset */ diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala index 26b1aaa74d6..e537afb453f 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala @@ -22,7 +22,7 @@ import kafka.log._ import kafka.network._ import kafka.message._ import kafka.api._ -import kafka.common.ErrorMapping +import kafka.common.{MessageSizeTooLargeException, ErrorMapping} import java.util.concurrent.atomic.AtomicLong import kafka.utils._ @@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Lo BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes) } catch { - case e => - error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) + case e: MessageSizeTooLargeException => + warn(e.getMessage() + " on " + request.topic + ":" + partition) + BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest + case t => + error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t) BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest - throw e + throw t } }