Browse Source

KAFKA-8722; Ensure record CRC is always verified when appending to log (#7124)

We found that when record.offset is not equal to the offset we are expecting, kafka will set the variable inPlaceAssignment to false. When inPlaceAssignment is false, data will not be verified. This patch adds the missing CRC verification.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/13879/head
lordcheng10 5 years ago committed by Jason Gustafson
parent
commit
8007211cc9
  1. 4
      core/src/main/scala/kafka/log/LogValidator.scala

4
core/src/main/scala/kafka/log/LogValidator.scala

@ -166,6 +166,7 @@ private[kafka] object LogValidator { @@ -166,6 +166,7 @@ private[kafka] object LogValidator {
records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry =>
val record = logEntry.record
record.ensureValid()
validateKey(record, compactedTopic)
if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) {
@ -201,9 +202,6 @@ private[kafka] object LogValidator { @@ -201,9 +202,6 @@ private[kafka] object LogValidator {
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
messageSizeMaybeChanged = true)
} else {
// ensure the inner messages are valid
validatedRecords.foreach(_.ensureValid)
// we can update the wrapper message only and write the compressed payload as is
val entry = records.shallowEntries.iterator.next()
val offset = offsetCounter.addAndGet(validatedRecords.size) - 1

Loading…
Cancel
Save