From 78e18b575c6b4724afe6107371d70c46fb57d3d9 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 17 May 2020 01:05:17 +0800 Subject: [PATCH] KAFKA-9617 Replica Fetcher can mark partition as failed when max.message.bytes is changed (#8659) Skip to check the size of record if the record is already accepted by leader. Reviewers: Guozhang Wang --- core/src/main/scala/kafka/log/Log.scala | 22 ++++++++++++------- .../test/scala/unit/kafka/log/LogTest.scala | 16 ++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 14e414c6277..d046368fb26 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1010,7 +1010,7 @@ class Log(@volatile private var _dir: File, leaderEpoch: Int, origin: AppendOrigin = AppendOrigin.Client, interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { - append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch) + append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false) } /** @@ -1025,7 +1025,9 @@ class Log(@volatile private var _dir: File, origin = AppendOrigin.Replication, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, - leaderEpoch = -1) + leaderEpoch = -1, + // disable to check the validation of record size since the record is already accepted by leader. + ignoreRecordSize = true) } /** @@ -1039,6 +1041,7 @@ class Log(@volatile private var _dir: File, * @param interBrokerProtocolVersion Inter-broker message protocol version * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader + * @param ignoreRecordSize true to skip validation of record size. * @throws KafkaStorageException If the append fails due to an I/O error. * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset @@ -1048,9 +1051,10 @@ class Log(@volatile private var _dir: File, origin: AppendOrigin, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, - leaderEpoch: Int): LogAppendInfo = { + leaderEpoch: Int, + ignoreRecordSize: Boolean): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { - val appendInfo = analyzeAndValidateRecords(records, origin) + val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize) // return if we have no valid messages or if this is a duplicate of the last appended entry if (appendInfo.shallowCount == 0) @@ -1097,7 +1101,7 @@ class Log(@volatile private var _dir: File, // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) - if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { + if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size @@ -1321,7 +1325,7 @@ class Log(@volatile private var _dir: File, * Validate the following: *
    *
  1. each message matches its CRC - *
  2. each message size is valid + *
  3. each message size is valid (if ignoreRecordSize is false) *
  4. that the sequence numbers of the incoming record batches are consistent with the existing state and with each other. *
* @@ -1335,7 +1339,9 @@ class Log(@volatile private var _dir: File, *
  • Whether any compression codec is used (if many are used, then the last one is given) * */ - private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = { + private def analyzeAndValidateRecords(records: MemoryRecords, + origin: AppendOrigin, + ignoreRecordSize: Boolean): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset: Option[Long] = None @@ -1375,7 +1381,7 @@ class Log(@volatile private var _dir: File, // Check if the message sizes are valid. val batchSize = batch.sizeInBytes - if (batchSize > config.maxMessageSize) { + if (!ignoreRecordSize && batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 41dc7e90e9b..94238d89ccb 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2078,6 +2078,22 @@ class LogTest { case _: RecordTooLargeException => // this is good } } + + @Test + def testMessageSizeCheckInAppendAsFollower(): Unit = { + val first = MemoryRecords.withRecords(0, CompressionType.NONE, 0, + new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes)) + val second = MemoryRecords.withRecords(5, CompressionType.NONE, 0, + new SimpleRecord("change (I need more bytes)... blah blah blah.".getBytes), + new SimpleRecord("More padding boo hoo".getBytes)) + + val log = createLog(logDir, LogTest.createLogConfig(maxMessageBytes = second.sizeInBytes - 1)) + + log.appendAsFollower(first) + // the second record is larger then limit but appendAsFollower does not validate the size. + log.appendAsFollower(second) + } + /** * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. */