Browse Source

HOTFIX: Set `baseOffset` correctly in `RecordAccumulator`

The bug meant that the base offset was the same as the batch size instead of 0 so the broker would always recompress batches.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2794 from ijuma/fix-records-builder-construction
pull/2174/merge
Ismael Juma 8 years ago committed by Jun Rao
parent
commit
f54b61909d
  1. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  2. 21
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
  3. 20
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -233,7 +233,7 @@ public final class RecordAccumulator {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later."); "support the required message format (v2). The broker must be version 0.11 or later.");
} }
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, this.batchSize); return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
} }
/** /**

21
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java

@ -278,15 +278,6 @@ public class MemoryRecords extends AbstractRecords {
return new MemoryRecords(buffer); return new MemoryRecords(buffer);
} }
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
CompressionType compressionType,
TimestampType timestampType,
int writeLimit) {
return new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L,
System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer, public static MemoryRecordsBuilder builder(ByteBuffer buffer,
CompressionType compressionType, CompressionType compressionType,
TimestampType timestampType, TimestampType timestampType,
@ -299,7 +290,10 @@ public class MemoryRecords extends AbstractRecords {
CompressionType compressionType, CompressionType compressionType,
TimestampType timestampType, TimestampType timestampType,
long baseOffset) { long baseOffset) {
return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis()); long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime);
} }
public static MemoryRecordsBuilder builder(ByteBuffer buffer, public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@ -323,7 +317,7 @@ public class MemoryRecords extends AbstractRecords {
int baseSequence) { int baseSequence) {
return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
buffer.capacity()); buffer.remaining());
} }
public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) { public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) {
@ -357,8 +351,11 @@ public class MemoryRecords extends AbstractRecords {
return MemoryRecords.EMPTY; return MemoryRecords.EMPTY;
int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records)); int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate); ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset, MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
System.currentTimeMillis(), pid, epoch, baseSequence); logAppendTime, pid, epoch, baseSequence);
for (SimpleRecord record : records) for (SimpleRecord record : records)
builder.append(record); builder.append(record);
return builder.build(); return builder.build();

20
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -32,10 +32,12 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -132,10 +134,26 @@ public class RecordAccumulatorTest {
@Test @Test
public void testAppendLarge() throws Exception { public void testAppendLarge() throws Exception {
int batchSize = 512; int batchSize = 512;
byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null); CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs); accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
Deque<ProducerBatch> batches = accum.batches().get(tp1);
assertEquals(1, batches.size());
ProducerBatch producerBatch = batches.peek();
List<MutableRecordBatch> recordBatches = TestUtils.toList(producerBatch.records().batches());
assertEquals(1, recordBatches.size());
MutableRecordBatch recordBatch = recordBatches.get(0);
assertEquals(0L, recordBatch.baseOffset());
List<Record> records = TestUtils.toList(recordBatch);
assertEquals(1, records.size());
Record record = records.get(0);
assertEquals(0L, record.offset());
assertEquals(ByteBuffer.wrap(key), record.key());
assertEquals(ByteBuffer.wrap(value), record.value());
assertEquals(0L, record.timestamp());
} }
@Test @Test

Loading…
Cancel
Save