diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index c4dda087af1..8fc04171bbf 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Time import scala.collection.{Seq, mutable} import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer private[kafka] object LogValidator extends Logging { @@ -147,14 +148,15 @@ private[kafka] object LogValidator extends Logging { throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic") } + /** + * This method returns an Option object that potentially holds the why a reason is rejected + */ private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean, - brokerTopicStats: BrokerTopicStats): Unit = { + brokerTopicStats: BrokerTopicStats): Option[String] = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."), - List(new RecordError(batchIndex))) + return Some(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.") } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -171,7 +173,10 @@ private[kafka] object LogValidator extends Logging { } } - validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats) + val result = validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats) + if (result.isDefined) + return result + validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs) } @@ -205,10 +210,16 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats) + val recordErrors = ListBuffer[RecordError]() for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { - validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match { + case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage) + case None => + } builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } + + processRecordErrors(recordErrors) } val convertedRecords = builder.build() @@ -247,8 +258,12 @@ private[kafka] object LogValidator extends Logging { var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L + val recordErrors = ListBuffer[RecordError]() for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { - validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match { + case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage) + case None => + } val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { @@ -257,6 +272,8 @@ private[kafka] object LogValidator extends Logging { } } + processRecordErrors(recordErrors) + if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp @@ -354,13 +371,18 @@ private[kafka] object LogValidator extends Logging { batch.streamingIterator(BufferSupplier.NO_CACHING) try { + val recordErrors = ListBuffer[RecordError]() for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { if (sourceCodec != NoCompressionCodec && record.isCompressed) - throw new RecordValidationException( - new InvalidRecordException(s"Compressed outer record should not have an inner record with a compression attribute set: $record"), - List(new RecordError(batchIndex))) - - validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + recordErrors += new RecordError( + batchIndex, + s"Compressed outer record should not have an inner record with a compression attribute set: $record" + ) + + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match { + case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage) + case None => + } uncompressedSizeInBytes += record.sizeInBytes() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { @@ -368,9 +390,10 @@ private[kafka] object LogValidator extends Logging { val expectedOffset = expectedInnerOffset.getAndIncrement() if (record.offset != expectedOffset) { brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new RecordError(batchIndex))) + recordErrors += new RecordError( + batchIndex, + s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition." + ) } if (record.timestamp > maxTimestamp) maxTimestamp = record.timestamp @@ -378,6 +401,7 @@ private[kafka] object LogValidator extends Logging { validatedRecords += record } + processRecordErrors(recordErrors) } finally { recordsIterator.close() } @@ -465,37 +489,60 @@ private[kafka] object LogValidator extends Logging { recordConversionStats = recordConversionStats) } - private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { + /** + * This method returns an Option object that potentially holds the message that a record is rejected because of + * having no key in a compacted topic + */ + private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Option[String] = { if (compactedTopic && !record.hasKey) { brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."), - List(new RecordError(batchIndex))) + return Some(s"Compacted topic cannot accept message without key in topic partition $topicPartition.") } + + None } /** * This method validates the timestamps of a message. * If the message is using create time, this method checks if it is within acceptable range. + * If a record has invalid timetamp or is out of range within acceptable timestamp span, this method + * returns an Option object with the message. + * + * The decision to make this function returns an Option object is based on KIP-467 */ private def validateTimestamp(batch: RecordBatch, record: Record, batchIndex: Int, now: Long, timestampType: TimestampType, - timestampDiffMaxMs: Long): Unit = { + timestampDiffMaxMs: Long): Option[String] = { if (timestampType == TimestampType.CREATE_TIME && record.timestamp != RecordBatch.NO_TIMESTAMP && math.abs(record.timestamp - now) > timestampDiffMaxMs) - throw new RecordValidationException( - new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + - s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"), - List(new RecordError(batchIndex))) + return Some(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]") + if (batch.timestampType == TimestampType.LOG_APPEND_TIME) - throw new RecordValidationException( - new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + - s"timestamp type to LogAppendTime."), - List(new RecordError(batchIndex))) + return Some(s"Invalid timestamp type in message $record. Producer should not set " + + s"timestamp type to LogAppendTime.") + + None + } + + private def processRecordErrors(recordErrors: ListBuffer[RecordError]): Unit = { + if (recordErrors.nonEmpty) { + // if the first RecordError is related to timestamp, we'll set the Exception to + // InvalidTimestampException + if (recordErrors.exists(re => re.message.contains("Invalid timestamp") || re.message.contains("The timestamp"))) { + throw new RecordValidationException( + new InvalidTimestampException("One or more records have been rejected due to invalid timestamp"), + recordErrors.toList) + } else { + throw new RecordValidationException( + new InvalidRecordException("One or more records have been rejected"), + recordErrors.toList) + } + } } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 570e252dc7d..5bd87daf9f5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -31,7 +31,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -1875,24 +1875,31 @@ class LogTest { val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact) val log = createLog(logDir, logConfig) - try { + val errorMsgPrefix = "Compacted topic cannot accept message without key" + + var e = intercept[RecordValidationException] { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case _: RecordValidationException => // this is good } - try { + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertEquals(1, e.recordErrors.size) + assertEquals(0, e.recordErrors.head.batchIndex) + assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix)) + + e = intercept[RecordValidationException] { log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case _: RecordValidationException => // this is good } - try { + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertEquals(1, e.recordErrors.size) + assertEquals(0, e.recordErrors.head.batchIndex) + assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix)) + + e = intercept[RecordValidationException] { log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case _: RecordValidationException => // this is good } + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertEquals(1, e.recordErrors.size) + assertEquals(1, e.recordErrors.head.batchIndex) // batch index is 1 + assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix)) // check if metric for NoKeyCompactedTopicRecordsPerSec is logged assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}")), 1) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 923ae918521..dccb4d2d029 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -1276,9 +1276,7 @@ class LogValidatorTest { assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) assertTrue(e.recordErrors.nonEmpty) - assertEquals(e.recordErrors.size, 1) - assertEquals(e.recordErrors.head.batchIndex, 0) - assertNull(e.recordErrors.head.message) + assertEquals(e.recordErrors.size, 3) } @Test @@ -1289,11 +1287,43 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) } + e.recordErrors.foreach(e => println(e.batchIndex + " " + e.message)) assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) assertTrue(e.recordErrors.nonEmpty) - assertEquals(e.recordErrors.size, 1) - assertEquals(e.recordErrors.head.batchIndex, 0) - assertNull(e.recordErrors.head.message) + // recordsWithInvalidInnerMagic creates 20 records + assertEquals(e.recordErrors.size, 20) + e.recordErrors.foreach(assertNotNull(_)) + } + + @Test + def testBatchWithInvalidRecordsAndInvalidTimestamp(): Unit = { + val records = (0 until 5).map(id => + LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, null, id.toString.getBytes()) + ) + + val buffer = ByteBuffer.allocate(1024) + val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, + TimestampType.CREATE_TIME, 0L) + var offset = 0 + + // we want to mix in a record with invalid timestamp range + builder.appendUncheckedWithOffset(offset, LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, + 1200L, null, "timestamp".getBytes)) + records.foreach { record => + offset += 30 + builder.appendUncheckedWithOffset(offset, record) + } + val invalidOffsetTimestampRecords = builder.build() + + val e = intercept[RecordValidationException] { + validateMessages(invalidOffsetTimestampRecords, + RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) + } + // if there is a mix of both regular InvalidRecordException and InvalidTimestampException, + // InvalidTimestampException takes precedence + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertTrue(e.recordErrors.nonEmpty) + assertEquals(6, e.recordErrors.size) } private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 3bc8d0aacb8..25404bb871c 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -27,6 +27,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ +import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.junit.Assert._ import org.junit.Test @@ -96,10 +97,15 @@ class ProduceRequestTest extends BaseRequestTest { val (tp, partitionResponse) = produceResponse.responses.asScala.head assertEquals(topicPartition, tp) assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) - assertEquals(1, partitionResponse.recordErrors.size()) + // there are 3 records with InvalidTimestampException created from inner function createRecords + assertEquals(3, partitionResponse.recordErrors.size()) assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex) - assertNull(partitionResponse.recordErrors.get(0).message) - assertNotNull(partitionResponse.errorMessage) + assertEquals(1, partitionResponse.recordErrors.get(1).batchIndex) + assertEquals(2, partitionResponse.recordErrors.get(2).batchIndex) + for (recordError <- partitionResponse.recordErrors.asScala) { + assertNotNull(recordError.message) + } + assertEquals("One or more records have been rejected due to invalid timestamp", partitionResponse.errorMessage) } @Test