From f5a4519be97881666d2d3e558ae6934fc2dad87c Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Sat, 2 Nov 2019 06:03:27 +0530 Subject: [PATCH] KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets #7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2. Author: Tu Tran Reviewers: Manikumar Reddy Closes #7628 from tuvtran/KAFKA-9080 --- core/src/main/scala/kafka/log/LogValidator.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 70bf3bf00da..c4dda087af1 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging { magic: Byte, brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP - val expectedInnerOffset = new LongRef(0) var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value @@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging { for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - val expectedOffset = expectedInnerOffset.getAndIncrement() - - // inner records offset should always be continuous - if (record.offset != expectedOffset) { - brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new RecordError(batchIndex))) - } - val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp