diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index bdba860e13d..7a0e53063a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -131,7 +131,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe + ", computed crc = " + computeChecksum() + ")"); } - private long baseTimestamp() { + /** + * Get the timestamp of the first record in this batch. It is always the create time of the record even if the + * timestamp type of the batch is log append time. + * + * @return The base timestamp + */ + public long baseTimestamp() { return buffer.getLong(BASE_TIMESTAMP_OFFSET); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index aaca8515925..66560caa5d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -331,15 +331,11 @@ public class MemoryRecordsBuilder { int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD; int offsetDelta = (int) (lastOffset - baseOffset); - final long baseTimestamp; final long maxTimestamp; - if (timestampType == TimestampType.LOG_APPEND_TIME) { - baseTimestamp = logAppendTime; + if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = logAppendTime; - } else { - baseTimestamp = this.baseTimestamp; + else maxTimestamp = this.maxTimestamp; - } DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index 728b6eb86ec..80494694644 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -32,8 +32,12 @@ public interface MutableRecordBatch extends RecordBatch { /** * Set the max timestamp for this batch. When using log append time, this effectively overrides the individual - * timestamps of all the records contained in the batch. Note that this typically requires re-computation - * of the batch's CRC. + * timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated + * by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not + * updated by this method. + * + * This typically requires re-computation of the batch's CRC. + * * @param timestampType The timestamp type * @param maxTimestamp The maximum timestamp */ diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index ef773dade46..65a6a95fbe4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -80,8 +80,10 @@ public interface RecordBatch extends Iterable { long checksum(); /** - * Get the timestamp of this record batch. This is the max timestamp among all records contained in this batch. - * This value is updated during compaction. + * Get the max timestamp or log append time of this record batch. + * + * If the timestamp type is create time, this is the max timestamp among all records contained in this batch and + * the value is updated during compaction. * * @return The max timestamp */ diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index f40745d5091..3ab9732903d 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -38,7 +38,7 @@ class LogValidatorTest { private def checkLogAppendTimeNonCompressed(magic: Byte) { val now = System.currentTimeMillis() // The timestamps should be overwritten - val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.NONE) + val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), now = now, @@ -52,7 +52,7 @@ class LogValidatorTest { isFromClient = true) val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) - validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch)) + validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) @@ -86,7 +86,7 @@ class LogValidatorTest { val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) - validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch)) + validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, -1, batch)) assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}", @@ -107,7 +107,7 @@ class LogValidatorTest { private def checkLogAppendTimeWithoutRecompression(magic: Byte) { val now = System.currentTimeMillis() // The timestamps should be overwritten - val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.GZIP) + val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), @@ -124,7 +124,7 @@ class LogValidatorTest { assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) - validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch)) + validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}", @@ -176,6 +176,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) assertEquals(batch.timestampType, TimestampType.CREATE_TIME) + maybeCheckBaseTimestamp(timestampSeq(0), batch) assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) assertEquals(producerEpoch, batch.producerEpoch) assertEquals(producerId, batch.producerId) @@ -237,6 +238,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) assertEquals(batch.timestampType, TimestampType.CREATE_TIME) + maybeCheckBaseTimestamp(timestampSeq(0), batch) assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) assertEquals(producerEpoch, batch.producerEpoch) assertEquals(producerId, batch.producerId) @@ -280,6 +282,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) + maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch) assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp) assertEquals(TimestampType.CREATE_TIME, batch.timestampType) assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch) @@ -316,6 +319,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) + maybeCheckBaseTimestamp(timestamp, batch) assertEquals(timestamp, batch.maxTimestamp) assertEquals(TimestampType.CREATE_TIME, batch.timestampType) assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch) @@ -367,6 +371,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) assertEquals(batch.timestampType, TimestampType.CREATE_TIME) + maybeCheckBaseTimestamp(timestampSeq(0), batch) assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) assertEquals(producerEpoch, batch.producerEpoch) assertEquals(producerId, batch.producerId) @@ -945,13 +950,25 @@ class LogValidatorTest { builder.build() } - def validateLogAppendTime(now: Long, batch: RecordBatch) { + def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = { + batch match { + case b: DefaultRecordBatch => + assertEquals(s"Unexpected base timestamp of batch $batch", expected, b.baseTimestamp) + case _ => // no-op + } + } + + /** + * expectedLogAppendTime is only checked if batch.magic is V2 or higher + */ + def validateLogAppendTime(expectedLogAppendTime: Long, expectedBaseTimestamp: Long, batch: RecordBatch) { assertTrue(batch.isValid) - assertTrue(batch.timestampType() == TimestampType.LOG_APPEND_TIME) - assertEquals(s"Timestamp of message $batch should be $now", now, batch.maxTimestamp) + assertTrue(batch.timestampType == TimestampType.LOG_APPEND_TIME) + assertEquals(s"Unexpected max timestamp of batch $batch", expectedLogAppendTime, batch.maxTimestamp) + maybeCheckBaseTimestamp(expectedBaseTimestamp, batch) for (record <- batch.asScala) { assertTrue(record.isValid) - assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp) + assertEquals(s"Unexpected timestamp of record $record", expectedLogAppendTime, record.timestamp) } }