Browse Source

Message size not checked at the server (patch v3); patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-469

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1378590 13f79535-47bb-0310-9956-ffa450edef68
pull/9/head
Jun Rao 12 years ago
parent
commit
695a8d43eb
  1. 1
      core/src/main/scala/kafka/log/Log.scala
  2. 12
      core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

1
core/src/main/scala/kafka/log/Log.scala

@ -255,6 +255,7 @@ private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val max @@ -255,6 +255,7 @@ private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val max
}
}
/**
* Read from the log file at the given offset
*/

12
core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

@ -22,7 +22,7 @@ import kafka.log._ @@ -22,7 +22,7 @@ import kafka.log._
import kafka.network._
import kafka.message._
import kafka.api._
import kafka.common.ErrorMapping
import kafka.common.{MessageSizeTooLargeException, ErrorMapping}
import java.util.concurrent.atomic.AtomicLong
import kafka.utils._
@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Lo @@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Lo
BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
}
catch {
case e =>
error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
case e: MessageSizeTooLargeException =>
warn(e.getMessage() + " on " + request.topic + ":" + partition)
BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
case t =>
error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t)
BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
throw e
throw t
}
}

Loading…
Cancel
Save