Browse Source

KAFKA-5353; baseTimestamp should always have a create timestamp

This makes the case where we build the records from scratch consistent
with the case where update the batch header "in place". Thanks to
edenhill who found the issue while testing librdkafka.

The reason our tests don’t catch this is that we rely on the maxTimestamp
to compute the record level timestamps if log append time is used.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3177 from ijuma/set-base-sequence-for-log-append-time
pull/2259/merge
Ismael Juma 8 years ago
parent
commit
647afeff6a
  1. 8
      clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
  2. 8
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
  3. 8
      clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
  4. 6
      clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
  5. 35
      core/src/test/scala/unit/kafka/log/LogValidatorTest.scala

8
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java

@ -131,7 +131,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe @@ -131,7 +131,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
+ ", computed crc = " + computeChecksum() + ")");
}
private long baseTimestamp() {
/**
* Get the timestamp of the first record in this batch. It is always the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The base timestamp
*/
public long baseTimestamp() {
return buffer.getLong(BASE_TIMESTAMP_OFFSET);
}

8
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

@ -331,15 +331,11 @@ public class MemoryRecordsBuilder { @@ -331,15 +331,11 @@ public class MemoryRecordsBuilder {
int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
int offsetDelta = (int) (lastOffset - baseOffset);
final long baseTimestamp;
final long maxTimestamp;
if (timestampType == TimestampType.LOG_APPEND_TIME) {
baseTimestamp = logAppendTime;
if (timestampType == TimestampType.LOG_APPEND_TIME)
maxTimestamp = logAppendTime;
} else {
baseTimestamp = this.baseTimestamp;
else
maxTimestamp = this.maxTimestamp;
}
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,

8
clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java

@ -32,8 +32,12 @@ public interface MutableRecordBatch extends RecordBatch { @@ -32,8 +32,12 @@ public interface MutableRecordBatch extends RecordBatch {
/**
* Set the max timestamp for this batch. When using log append time, this effectively overrides the individual
* timestamps of all the records contained in the batch. Note that this typically requires re-computation
* of the batch's CRC.
* timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated
* by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not
* updated by this method.
*
* This typically requires re-computation of the batch's CRC.
*
* @param timestampType The timestamp type
* @param maxTimestamp The maximum timestamp
*/

6
clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java

@ -80,8 +80,10 @@ public interface RecordBatch extends Iterable<Record> { @@ -80,8 +80,10 @@ public interface RecordBatch extends Iterable<Record> {
long checksum();
/**
* Get the timestamp of this record batch. This is the max timestamp among all records contained in this batch.
* This value is updated during compaction.
* Get the max timestamp or log append time of this record batch.
*
* If the timestamp type is create time, this is the max timestamp among all records contained in this batch and
* the value is updated during compaction.
*
* @return The max timestamp
*/

35
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala

@ -38,7 +38,7 @@ class LogValidatorTest { @@ -38,7 +38,7 @@ class LogValidatorTest {
private def checkLogAppendTimeNonCompressed(magic: Byte) {
val now = System.currentTimeMillis()
// The timestamps should be overwritten
val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.NONE)
val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
now = now,
@ -52,7 +52,7 @@ class LogValidatorTest { @@ -52,7 +52,7 @@ class LogValidatorTest {
isFromClient = true)
val validatedRecords = validatedResults.validatedRecords
assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch))
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
@ -86,7 +86,7 @@ class LogValidatorTest { @@ -86,7 +86,7 @@ class LogValidatorTest {
val validatedRecords = validatedResults.validatedRecords
assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, -1, batch))
assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
@ -107,7 +107,7 @@ class LogValidatorTest { @@ -107,7 +107,7 @@ class LogValidatorTest {
private def checkLogAppendTimeWithoutRecompression(magic: Byte) {
val now = System.currentTimeMillis()
// The timestamps should be overwritten
val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.GZIP)
val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.GZIP)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
@ -124,7 +124,7 @@ class LogValidatorTest { @@ -124,7 +124,7 @@ class LogValidatorTest {
assertEquals("message set size should not change", records.records.asScala.size,
validatedRecords.records.asScala.size)
validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch))
assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
@ -176,6 +176,7 @@ class LogValidatorTest { @@ -176,6 +176,7 @@ class LogValidatorTest {
for (batch <- validatedRecords.batches.asScala) {
assertTrue(batch.isValid)
assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
maybeCheckBaseTimestamp(timestampSeq(0), batch)
assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
assertEquals(producerEpoch, batch.producerEpoch)
assertEquals(producerId, batch.producerId)
@ -237,6 +238,7 @@ class LogValidatorTest { @@ -237,6 +238,7 @@ class LogValidatorTest {
for (batch <- validatedRecords.batches.asScala) {
assertTrue(batch.isValid)
assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
maybeCheckBaseTimestamp(timestampSeq(0), batch)
assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
assertEquals(producerEpoch, batch.producerEpoch)
assertEquals(producerId, batch.producerId)
@ -280,6 +282,7 @@ class LogValidatorTest { @@ -280,6 +282,7 @@ class LogValidatorTest {
for (batch <- validatedRecords.batches.asScala) {
assertTrue(batch.isValid)
maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch)
assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp)
assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch)
@ -316,6 +319,7 @@ class LogValidatorTest { @@ -316,6 +319,7 @@ class LogValidatorTest {
for (batch <- validatedRecords.batches.asScala) {
assertTrue(batch.isValid)
maybeCheckBaseTimestamp(timestamp, batch)
assertEquals(timestamp, batch.maxTimestamp)
assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch)
@ -367,6 +371,7 @@ class LogValidatorTest { @@ -367,6 +371,7 @@ class LogValidatorTest {
for (batch <- validatedRecords.batches.asScala) {
assertTrue(batch.isValid)
assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
maybeCheckBaseTimestamp(timestampSeq(0), batch)
assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
assertEquals(producerEpoch, batch.producerEpoch)
assertEquals(producerId, batch.producerId)
@ -945,13 +950,25 @@ class LogValidatorTest { @@ -945,13 +950,25 @@ class LogValidatorTest {
builder.build()
}
def validateLogAppendTime(now: Long, batch: RecordBatch) {
def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = {
batch match {
case b: DefaultRecordBatch =>
assertEquals(s"Unexpected base timestamp of batch $batch", expected, b.baseTimestamp)
case _ => // no-op
}
}
/**
* expectedLogAppendTime is only checked if batch.magic is V2 or higher
*/
def validateLogAppendTime(expectedLogAppendTime: Long, expectedBaseTimestamp: Long, batch: RecordBatch) {
assertTrue(batch.isValid)
assertTrue(batch.timestampType() == TimestampType.LOG_APPEND_TIME)
assertEquals(s"Timestamp of message $batch should be $now", now, batch.maxTimestamp)
assertTrue(batch.timestampType == TimestampType.LOG_APPEND_TIME)
assertEquals(s"Unexpected max timestamp of batch $batch", expectedLogAppendTime, batch.maxTimestamp)
maybeCheckBaseTimestamp(expectedBaseTimestamp, batch)
for (record <- batch.asScala) {
assertTrue(record.isValid)
assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
assertEquals(s"Unexpected timestamp of record $record", expectedLogAppendTime, record.timestamp)
}
}

Loading…
Cancel
Save