From 695a8d43eb2a58920f3f00ae4dbce30b10adb42c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 29 Aug 2012 14:52:39 +0000 Subject: [PATCH] Message size not checked at the server (patch v3); patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-469 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1378590 13f79535-47bb-0310-9956-ffa450edef68 --- core/src/main/scala/kafka/log/Log.scala | 1 + .../scala/kafka/server/KafkaRequestHandlers.scala | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0d362485a69..545069972fb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -255,6 +255,7 @@ private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val max } } + /** * Read from the log file at the given offset */ diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala index 26b1aaa74d6..e537afb453f 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala @@ -22,7 +22,7 @@ import kafka.log._ import kafka.network._ import kafka.message._ import kafka.api._ -import kafka.common.ErrorMapping +import kafka.common.{MessageSizeTooLargeException, ErrorMapping} import java.util.concurrent.atomic.AtomicLong import kafka.utils._ @@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Lo BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes) } catch { - case e => - error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) + case e: MessageSizeTooLargeException => + warn(e.getMessage() + " on " + request.topic + ":" + partition) + BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest + case t => + error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t) BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest - throw e + throw t } }