Browse Source

KAFKA-8729: Change `PartitionResponse` to include all troubling records (#7612)

Background:
Currently, whenever a batch is dropped because ofInvalidRecordException or InvalidTimestampException, only the culprit record appears in ProduceResponse.PartitionResponse.recordErrors. However, after users try to resend that batch excluding the rejected message, the latter records are not guaranteed to be free of problems.

Changes:
To address this issue, I changed the function signature of validateKey, validateRecord and validateTimestamp to return a Scala's Option object. Specifically, this object will hold the reason/message the current record in iteration fails and leaves to the callers (convertAndAssignOffsetsNonCompressed, assignOffsetsNonCompressed, validateMessagesAndAssignOffsetsCompressed) to gathered all troubling records into one place. Then, all these records will be returned along with the PartitionResponse object. As a result, if a batch contains more than one record errors, users see exactly which records cause the failure. PartitionResponse.recordErrors is a list of RecordError objects introduced by #7167 which include batchIndex denoting the relative position of a record in a batch and message indicating the reason of failure.

Gotchas:
Things are particularly tricky when a batch has records rejected because of both InvalidRecordException and InvalidTimestampException. In this case, the InvalidTimestampException takes higher precedence. Therefore, the Error field in PartitionResponse will be encoded with INVALID_TIMESTAMP.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/7663/head
Tu V. Tran 5 years ago committed by Guozhang Wang
parent
commit
16f1ce12e4
  1. 99
      core/src/main/scala/kafka/log/LogValidator.scala
  2. 33
      core/src/test/scala/unit/kafka/log/LogTest.scala
  3. 42
      core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
  4. 12
      core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala

99
core/src/main/scala/kafka/log/LogValidator.scala

@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Time @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, mutable}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
private[kafka] object LogValidator extends Logging {
@ -147,14 +148,15 @@ private[kafka] object LogValidator extends Logging { @@ -147,14 +148,15 @@ private[kafka] object LogValidator extends Logging {
throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic")
}
/**
* This method returns an Option object that potentially holds the why a reason is rejected
*/
private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long,
timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean,
brokerTopicStats: BrokerTopicStats): Unit = {
brokerTopicStats: BrokerTopicStats): Option[String] = {
if (!record.hasMagic(batch.magic)) {
brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
return Some(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.")
}
// verify the record-level CRC only if this is one of the deep entries of a compressed message
@ -171,7 +173,10 @@ private[kafka] object LogValidator extends Logging { @@ -171,7 +173,10 @@ private[kafka] object LogValidator extends Logging {
}
}
validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats)
val result = validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats)
if (result.isDefined)
return result
validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs)
}
@ -205,10 +210,16 @@ private[kafka] object LogValidator extends Logging { @@ -205,10 +210,16 @@ private[kafka] object LogValidator extends Logging {
for (batch <- records.batches.asScala) {
validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats)
val recordErrors = ListBuffer[RecordError]()
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match {
case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage)
case None =>
}
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
}
processRecordErrors(recordErrors)
}
val convertedRecords = builder.build()
@ -247,8 +258,12 @@ private[kafka] object LogValidator extends Logging { @@ -247,8 +258,12 @@ private[kafka] object LogValidator extends Logging {
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxBatchTimestamp = -1L
val recordErrors = ListBuffer[RecordError]()
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match {
case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage)
case None =>
}
val offset = offsetCounter.getAndIncrement()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
@ -257,6 +272,8 @@ private[kafka] object LogValidator extends Logging { @@ -257,6 +272,8 @@ private[kafka] object LogValidator extends Logging {
}
}
processRecordErrors(recordErrors)
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) {
maxTimestamp = maxBatchTimestamp
offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp
@ -354,13 +371,18 @@ private[kafka] object LogValidator extends Logging { @@ -354,13 +371,18 @@ private[kafka] object LogValidator extends Logging {
batch.streamingIterator(BufferSupplier.NO_CACHING)
try {
val recordErrors = ListBuffer[RecordError]()
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new RecordValidationException(
new InvalidRecordException(s"Compressed outer record should not have an inner record with a compression attribute set: $record"),
List(new RecordError(batchIndex)))
recordErrors += new RecordError(
batchIndex,
s"Compressed outer record should not have an inner record with a compression attribute set: $record"
)
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match {
case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage)
case None =>
}
uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
@ -368,9 +390,10 @@ private[kafka] object LogValidator extends Logging { @@ -368,9 +390,10 @@ private[kafka] object LogValidator extends Logging {
val expectedOffset = expectedInnerOffset.getAndIncrement()
if (record.offset != expectedOffset) {
brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
recordErrors += new RecordError(
batchIndex,
s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."
)
}
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
@ -378,6 +401,7 @@ private[kafka] object LogValidator extends Logging { @@ -378,6 +401,7 @@ private[kafka] object LogValidator extends Logging {
validatedRecords += record
}
processRecordErrors(recordErrors)
} finally {
recordsIterator.close()
}
@ -465,37 +489,60 @@ private[kafka] object LogValidator extends Logging { @@ -465,37 +489,60 @@ private[kafka] object LogValidator extends Logging {
recordConversionStats = recordConversionStats)
}
private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) {
/**
* This method returns an Option object that potentially holds the message that a record is rejected because of
* having no key in a compacted topic
*/
private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Option[String] = {
if (compactedTopic && !record.hasKey) {
brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
return Some(s"Compacted topic cannot accept message without key in topic partition $topicPartition.")
}
None
}
/**
* This method validates the timestamps of a message.
* If the message is using create time, this method checks if it is within acceptable range.
* If a record has invalid timetamp or is out of range within acceptable timestamp span, this method
* returns an Option object with the message.
*
* The decision to make this function returns an Option object is based on KIP-467
*/
private def validateTimestamp(batch: RecordBatch,
record: Record,
batchIndex: Int,
now: Long,
timestampType: TimestampType,
timestampDiffMaxMs: Long): Unit = {
timestampDiffMaxMs: Long): Option[String] = {
if (timestampType == TimestampType.CREATE_TIME
&& record.timestamp != RecordBatch.NO_TIMESTAMP
&& math.abs(record.timestamp - now) > timestampDiffMaxMs)
throw new RecordValidationException(
new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"),
List(new RecordError(batchIndex)))
return Some(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
if (batch.timestampType == TimestampType.LOG_APPEND_TIME)
return Some(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime.")
None
}
private def processRecordErrors(recordErrors: ListBuffer[RecordError]): Unit = {
if (recordErrors.nonEmpty) {
// if the first RecordError is related to timestamp, we'll set the Exception to
// InvalidTimestampException
if (recordErrors.exists(re => re.message.contains("Invalid timestamp") || re.message.contains("The timestamp"))) {
throw new RecordValidationException(
new InvalidTimestampException("One or more records have been rejected due to invalid timestamp"),
recordErrors.toList)
} else {
throw new RecordValidationException(
new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime."),
List(new RecordError(batchIndex)))
new InvalidRecordException("One or more records have been rejected"),
recordErrors.toList)
}
}
}
case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,

33
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -31,7 +31,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile @@ -31,7 +31,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@ -1875,24 +1875,31 @@ class LogTest { @@ -1875,24 +1875,31 @@ class LogTest {
val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact)
val log = createLog(logDir, logConfig)
try {
val errorMsgPrefix = "Compacted topic cannot accept message without key"
var e = intercept[RecordValidationException] {
log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case _: RecordValidationException => // this is good
}
try {
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertEquals(1, e.recordErrors.size)
assertEquals(0, e.recordErrors.head.batchIndex)
assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix))
e = intercept[RecordValidationException] {
log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case _: RecordValidationException => // this is good
}
try {
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertEquals(1, e.recordErrors.size)
assertEquals(0, e.recordErrors.head.batchIndex)
assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix))
e = intercept[RecordValidationException] {
log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case _: RecordValidationException => // this is good
}
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertEquals(1, e.recordErrors.size)
assertEquals(1, e.recordErrors.head.batchIndex) // batch index is 1
assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix))
// check if metric for NoKeyCompactedTopicRecordsPerSec is logged
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}")), 1)

42
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala

@ -1276,9 +1276,7 @@ class LogValidatorTest { @@ -1276,9 +1276,7 @@ class LogValidatorTest {
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 1)
assertEquals(e.recordErrors.head.batchIndex, 0)
assertNull(e.recordErrors.head.message)
assertEquals(e.recordErrors.size, 3)
}
@Test
@ -1289,11 +1287,43 @@ class LogValidatorTest { @@ -1289,11 +1287,43 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
}
e.recordErrors.foreach(e => println(e.batchIndex + " " + e.message))
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 1)
assertEquals(e.recordErrors.head.batchIndex, 0)
assertNull(e.recordErrors.head.message)
// recordsWithInvalidInnerMagic creates 20 records
assertEquals(e.recordErrors.size, 20)
e.recordErrors.foreach(assertNotNull(_))
}
@Test
def testBatchWithInvalidRecordsAndInvalidTimestamp(): Unit = {
val records = (0 until 5).map(id =>
LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, null, id.toString.getBytes())
)
val buffer = ByteBuffer.allocate(1024)
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP,
TimestampType.CREATE_TIME, 0L)
var offset = 0
// we want to mix in a record with invalid timestamp range
builder.appendUncheckedWithOffset(offset, LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1,
1200L, null, "timestamp".getBytes))
records.foreach { record =>
offset += 30
builder.appendUncheckedWithOffset(offset, record)
}
val invalidOffsetTimestampRecords = builder.build()
val e = intercept[RecordValidationException] {
validateMessages(invalidOffsetTimestampRecords,
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
}
// if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
// InvalidTimestampException takes precedence
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(6, e.recordErrors.size)
}
private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = {

12
core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala

@ -27,6 +27,7 @@ import kafka.utils.TestUtils @@ -27,6 +27,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
@ -96,10 +97,15 @@ class ProduceRequestTest extends BaseRequestTest { @@ -96,10 +97,15 @@ class ProduceRequestTest extends BaseRequestTest {
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error)
assertEquals(1, partitionResponse.recordErrors.size())
// there are 3 records with InvalidTimestampException created from inner function createRecords
assertEquals(3, partitionResponse.recordErrors.size())
assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex)
assertNull(partitionResponse.recordErrors.get(0).message)
assertNotNull(partitionResponse.errorMessage)
assertEquals(1, partitionResponse.recordErrors.get(1).batchIndex)
assertEquals(2, partitionResponse.recordErrors.get(2).batchIndex)
for (recordError <- partitionResponse.recordErrors.asScala) {
assertNotNull(recordError.message)
}
assertEquals("One or more records have been rejected due to invalid timestamp", partitionResponse.errorMessage)
}
@Test

Loading…
Cancel
Save