From f49697a2796540452a21d4a29ab879ba04214046 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 16 Jun 2017 20:36:13 +0100 Subject: [PATCH] KAFKA-5456; Ensure producer handles old format large compressed messages More specifically, fix the case where a compressed V0 or V1 message is larger than the producer batch size. Author: Jason Gustafson Reviewers: Apurva Mehta , Ismael Juma Closes #3356 from hachikuji/KAFKA-5456 --- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../producer/internals/ProducerBatch.java | 11 ++-- .../producer/internals/RecordAccumulator.java | 2 +- .../kafka/common/record/AbstractRecords.java | 18 +++++-- .../common/record/DefaultRecordBatch.java | 6 ++- .../common/record/MemoryRecordsBuilder.java | 21 ++++---- .../internals/RecordAccumulatorTest.java | 52 ++++++++++++++++++- .../record/RecordBatchIterationBenchmark.java | 8 +-- 8 files changed, 91 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5be20e25d4b..f482d5bcb26 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -636,8 +636,8 @@ public class KafkaProducer implements Producer { setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); - int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), - serializedKey, serializedValue, headers); + int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), + compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 76e2aaae137..fcdda8d2882 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -104,7 +104,8 @@ public final class ProducerBatch { return null; } else { Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); - this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers)); + this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), + recordsBuilder.compressionType(), key, value, headers)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, @@ -128,8 +129,8 @@ public final class ProducerBatch { } else { // No need to get the CRC. this.recordsBuilder.append(timestamp, key, value); - this.maxRecordSize = Math.max(this.maxRecordSize, - AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers)); + this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), + recordsBuilder.compressionType(), key, value, headers)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, thunk.future.checksumOrNull(), key == null ? -1 : key.remaining(), @@ -252,8 +253,8 @@ public final class ProducerBatch { } private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) { - int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(), - record.key(), record.value(), record.headers()), batchSize); + int initialSize = Math.max(AbstractRecords.estimateSizeInBytesUpperBound(magic(), + recordsBuilder.compressionType(), record.key(), record.value(), record.headers()), batchSize); ByteBuffer buffer = ByteBuffer.allocate(initialSize); // Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 10c68d832b1..ea6624e7588 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -200,7 +200,7 @@ public final class RecordAccumulator { // we don't have an in-progress record batch try to allocate a new batch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value, headers)); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 04d70714a7f..9ba2048d72f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -180,13 +180,23 @@ public abstract class AbstractRecords implements Records { return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16); } - public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) { - return sizeInBytesUpperBound(magic, Utils.wrapNullable(key), Utils.wrapNullable(value), headers); + /** + * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only + * an estimate because it does not take into account overhead from the compression algorithm. + */ + public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, byte[] key, byte[] value, Header[] headers) { + return estimateSizeInBytesUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), headers); } - public static int sizeInBytesUpperBound(byte magic, ByteBuffer key, ByteBuffer value, Header[] headers) { + /** + * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only + * an estimate because it does not take into account overhead from the compression algorithm. + */ + public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key, ByteBuffer value, Header[] headers) { if (magic >= RecordBatch.MAGIC_VALUE_V2) - return DefaultRecordBatch.batchSizeUpperBound(key, value, headers); + return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers); + else if (compressionType != CompressionType.NONE) + return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic) + LegacyRecord.recordSize(magic, key, value); else return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ab0813aae83..353eb6a1cf5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -457,9 +457,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } /** - * Get an upper bound on the size of a batch with only a single record using a given key and value. + * Get an upper bound on the size of a batch with only a single record using a given key and value. This + * is only an estimate because it does not take into account additional overhead from the compression + * algorithm used. */ - static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { + static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 0d0a0e28396..9676c83dd08 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -677,28 +677,29 @@ public class MemoryRecordsBuilder { } /** - * Check if we have room for a new record containing the given key/value pair + * Check if we have room for a new record containing the given key/value pair. If no records have been + * appended, then this returns true. */ public boolean hasRoomFor(long timestamp, byte[] key, byte[] value, Header[] headers) { return hasRoomFor(timestamp, wrapNullable(key), wrapNullable(value), headers); } /** - * Check if we have room for a new record containing the given key/value pair + * Check if we have room for a new record containing the given key/value pair. If no records have been + * appended, then this returns true. * * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be - * accurate if compression is really used. When this happens, the following append may cause dynamic buffer + * accurate if compression is used. When this happens, the following append may cause dynamic buffer * re-allocation in the underlying byte buffer stream. - * - * There is an exceptional case when appending a single message whose size is larger than the batch size, the - * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case - * the checking should be based on the capacity of the initialized buffer rather than the write limit in order - * to accept this single record. */ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { if (isFull()) return false; + // We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed) + if (numRecords == 0) + return true; + final int recordSize; if (magic < RecordBatch.MAGIC_VALUE_V2) { recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value); @@ -709,9 +710,7 @@ public class MemoryRecordsBuilder { } // Be conservative and not take compression of the new record into consideration. - return numRecords == 0 ? - bufferStream.remaining() >= recordSize : - this.writeLimit >= estimatedBytesWritten() + recordSize; + return this.writeLimit >= estimatedBytesWritten() + recordSize; } public boolean isClosed() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index ed6a0d17706..5bfbdf0aa7f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -138,11 +138,59 @@ public class RecordAccumulatorTest { } @Test - public void testAppendLarge() throws Exception { + public void testAppendLargeCompressed() throws Exception { + testAppendLarge(CompressionType.GZIP); + } + + @Test + public void testAppendLargeNonCompressed() throws Exception { + testAppendLarge(CompressionType.NONE); + } + + private void testAppendLarge(CompressionType compressionType) throws Exception { int batchSize = 512; byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null); + compressionType, 0L, 100L, metrics, time, new ApiVersions(), null); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + + Deque batches = accum.batches().get(tp1); + assertEquals(1, batches.size()); + ProducerBatch producerBatch = batches.peek(); + List recordBatches = TestUtils.toList(producerBatch.records().batches()); + assertEquals(1, recordBatches.size()); + MutableRecordBatch recordBatch = recordBatches.get(0); + assertEquals(0L, recordBatch.baseOffset()); + List records = TestUtils.toList(recordBatch); + assertEquals(1, records.size()); + Record record = records.get(0); + assertEquals(0L, record.offset()); + assertEquals(ByteBuffer.wrap(key), record.key()); + assertEquals(ByteBuffer.wrap(value), record.value()); + assertEquals(0L, record.timestamp()); + } + + @Test + public void testAppendLargeOldMessageFormatCompressed() throws Exception { + testAppendLargeOldMessageFormat(CompressionType.GZIP); + } + + @Test + public void testAppendLargeOldMessageFormatNonCompressed() throws Exception { + testAppendLargeOldMessageFormat(CompressionType.NONE); + } + + private void testAppendLargeOldMessageFormat(CompressionType compressionType) throws Exception { + int batchSize = 512; + byte[] value = new byte[2 * batchSize]; + + ApiVersions apiVersions = new ApiVersions(); + apiVersions.update(node1.idString(), NodeApiVersions.create(Collections.singleton( + new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2)))); + + RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, + compressionType, 0L, 100L, metrics, time, apiVersions, null); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java index cfb72e61c62..75fec048174 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.jmh.record; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.CompressionType; @@ -59,7 +58,7 @@ public class RecordBatchIterationBenchmark { } @Param(value = {"LZ4", "SNAPPY", "NONE"}) - private CompressionType type = CompressionType.NONE; + private CompressionType compressionType = CompressionType.NONE; @Param(value = {"1", "2"}) private byte messageVersion = CURRENT_MAGIC_VALUE; @@ -98,11 +97,12 @@ public class RecordBatchIterationBenchmark { private ByteBuffer createBatch(int batchSize) { byte[] value = new byte[messageSize]; final ByteBuffer buf = ByteBuffer.allocate( - AbstractRecords.sizeInBytesUpperBound(messageVersion, new byte[0], value, new Header[0]) * batchSize + AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType, new byte[0], value, + Record.EMPTY_HEADERS) * batchSize ); final MemoryRecordsBuilder builder = - MemoryRecords.builder(buf, messageVersion, type, TimestampType.CREATE_TIME, startingOffset); + MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset); for (int i = 0; i < batchSize; ++i) { switch (bytes) {