From f54b61909d525547d65123c02bbd36d92ccee5da Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 3 Apr 2017 09:40:38 -0700 Subject: [PATCH] HOTFIX: Set `baseOffset` correctly in `RecordAccumulator` The bug meant that the base offset was the same as the batch size instead of 0 so the broker would always recompress batches. Author: Ismael Juma Reviewers: Jun Rao Closes #2794 from ijuma/fix-records-builder-construction --- .../producer/internals/RecordAccumulator.java | 2 +- .../kafka/common/record/MemoryRecords.java | 21 ++++++++----------- .../internals/RecordAccumulatorTest.java | 20 +++++++++++++++++- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index e07d201a2e2..299356ea8a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -233,7 +233,7 @@ public final class RecordAccumulator { throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + "support the required message format (v2). The broker must be version 0.11 or later."); } - return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, this.batchSize); + return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 8c4e7714feb..f810e394421 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -278,15 +278,6 @@ public class MemoryRecords extends AbstractRecords { return new MemoryRecords(buffer); } - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - CompressionType compressionType, - TimestampType timestampType, - int writeLimit) { - return new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L, - System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, - RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit); - } - public static MemoryRecordsBuilder builder(ByteBuffer buffer, CompressionType compressionType, TimestampType timestampType, @@ -299,7 +290,10 @@ public class MemoryRecords extends AbstractRecords { CompressionType compressionType, TimestampType timestampType, long baseOffset) { - return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis()); + long logAppendTime = RecordBatch.NO_TIMESTAMP; + if (timestampType == TimestampType.LOG_APPEND_TIME) + logAppendTime = System.currentTimeMillis(); + return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime); } public static MemoryRecordsBuilder builder(ByteBuffer buffer, @@ -323,7 +317,7 @@ public class MemoryRecords extends AbstractRecords { int baseSequence) { return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, - buffer.capacity()); + buffer.remaining()); } public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) { @@ -357,8 +351,11 @@ public class MemoryRecords extends AbstractRecords { return MemoryRecords.EMPTY; int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records)); ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate); + long logAppendTime = RecordBatch.NO_TIMESTAMP; + if (timestampType == TimestampType.LOG_APPEND_TIME) + logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset, - System.currentTimeMillis(), pid, epoch, baseSequence); + logAppendTime, pid, epoch, baseSequence); for (SimpleRecord record : records) builder.append(record); return builder.build(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 9117e16881c..d152117034d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -32,10 +32,12 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Test; @@ -132,10 +134,26 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; + byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null); - accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs); + accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + + Deque batches = accum.batches().get(tp1); + assertEquals(1, batches.size()); + ProducerBatch producerBatch = batches.peek(); + List recordBatches = TestUtils.toList(producerBatch.records().batches()); + assertEquals(1, recordBatches.size()); + MutableRecordBatch recordBatch = recordBatches.get(0); + assertEquals(0L, recordBatch.baseOffset()); + List records = TestUtils.toList(recordBatch); + assertEquals(1, records.size()); + Record record = records.get(0); + assertEquals(0L, record.offset()); + assertEquals(ByteBuffer.wrap(key), record.key()); + assertEquals(ByteBuffer.wrap(value), record.value()); + assertEquals(0L, record.timestamp()); } @Test