Browse Source

KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets

#7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2.

Author: Tu Tran <tu@confluent.io>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #7628 from tuvtran/KAFKA-9080
pull/6329/merge
Tu Tran 5 years ago committed by Manikumar Reddy
parent
commit
f5a4519be9
  1. 11
      core/src/main/scala/kafka/log/LogValidator.scala

11
core/src/main/scala/kafka/log/LogValidator.scala

@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging { @@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging {
magic: Byte,
brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging { @@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging {
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
val expectedOffset = expectedInnerOffset.getAndIncrement()
// inner records offset should always be continuous
if (record.offset != expectedOffset) {
brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
}
val offset = offsetCounter.getAndIncrement()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
maxBatchTimestamp = record.timestamp

Loading…
Cancel
Save