Browse Source

KAFKA-5456; Ensure producer handles old format large compressed messages

More specifically, fix the case where a compressed V0 or V1 message is
larger than the producer batch size.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3356 from hachikuji/KAFKA-5456
pull/3356/merge
Jason Gustafson 8 years ago committed by Ismael Juma
parent
commit
f49697a279
  1. 4
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 11
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
  3. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  4. 18
      clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
  5. 6
      clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
  6. 21
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
  7. 52
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  8. 8
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java

4
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -636,8 +636,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -636,8 +636,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
serializedKey, serializedValue, headers);
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

11
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

@ -104,7 +104,8 @@ public final class ProducerBatch { @@ -104,7 +104,8 @@ public final class ProducerBatch {
return null;
} else {
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
@ -128,8 +129,8 @@ public final class ProducerBatch { @@ -128,8 +129,8 @@ public final class ProducerBatch {
} else {
// No need to get the CRC.
this.recordsBuilder.append(timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize,
AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, thunk.future.checksumOrNull(),
key == null ? -1 : key.remaining(),
@ -252,8 +253,8 @@ public final class ProducerBatch { @@ -252,8 +253,8 @@ public final class ProducerBatch {
}
private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(),
record.key(), record.value(), record.headers()), batchSize);
int initialSize = Math.max(AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), record.key(), record.value(), record.headers()), batchSize);
ByteBuffer buffer = ByteBuffer.allocate(initialSize);
// Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional)

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -200,7 +200,7 @@ public final class RecordAccumulator { @@ -200,7 +200,7 @@ public final class RecordAccumulator {
// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {

18
clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java

@ -180,13 +180,23 @@ public abstract class AbstractRecords implements Records { @@ -180,13 +180,23 @@ public abstract class AbstractRecords implements Records {
return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
}
public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) {
return sizeInBytesUpperBound(magic, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
/**
* Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
* an estimate because it does not take into account overhead from the compression algorithm.
*/
public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, byte[] key, byte[] value, Header[] headers) {
return estimateSizeInBytesUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
}
public static int sizeInBytesUpperBound(byte magic, ByteBuffer key, ByteBuffer value, Header[] headers) {
/**
* Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
* an estimate because it does not take into account overhead from the compression algorithm.
*/
public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key, ByteBuffer value, Header[] headers) {
if (magic >= RecordBatch.MAGIC_VALUE_V2)
return DefaultRecordBatch.batchSizeUpperBound(key, value, headers);
return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers);
else if (compressionType != CompressionType.NONE)
return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic) + LegacyRecord.recordSize(magic, key, value);
else
return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
}

6
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java

@ -457,9 +457,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe @@ -457,9 +457,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
/**
* Get an upper bound on the size of a batch with only a single record using a given key and value.
* Get an upper bound on the size of a batch with only a single record using a given key and value. This
* is only an estimate because it does not take into account additional overhead from the compression
* algorithm used.
*/
static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}

21
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

@ -677,28 +677,29 @@ public class MemoryRecordsBuilder { @@ -677,28 +677,29 @@ public class MemoryRecordsBuilder {
}
/**
* Check if we have room for a new record containing the given key/value pair
* Check if we have room for a new record containing the given key/value pair. If no records have been
* appended, then this returns true.
*/
public boolean hasRoomFor(long timestamp, byte[] key, byte[] value, Header[] headers) {
return hasRoomFor(timestamp, wrapNullable(key), wrapNullable(value), headers);
}
/**
* Check if we have room for a new record containing the given key/value pair
* Check if we have room for a new record containing the given key/value pair. If no records have been
* appended, then this returns true.
*
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
* accurate if compression is really used. When this happens, the following append may cause dynamic buffer
* accurate if compression is used. When this happens, the following append may cause dynamic buffer
* re-allocation in the underlying byte buffer stream.
*
* There is an exceptional case when appending a single message whose size is larger than the batch size, the
* capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
* the checking should be based on the capacity of the initialized buffer rather than the write limit in order
* to accept this single record.
*/
public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
if (isFull())
return false;
// We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed)
if (numRecords == 0)
return true;
final int recordSize;
if (magic < RecordBatch.MAGIC_VALUE_V2) {
recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
@ -709,9 +710,7 @@ public class MemoryRecordsBuilder { @@ -709,9 +710,7 @@ public class MemoryRecordsBuilder {
}
// Be conservative and not take compression of the new record into consideration.
return numRecords == 0 ?
bufferStream.remaining() >= recordSize :
this.writeLimit >= estimatedBytesWritten() + recordSize;
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
public boolean isClosed() {

52
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -138,11 +138,59 @@ public class RecordAccumulatorTest { @@ -138,11 +138,59 @@ public class RecordAccumulatorTest {
}
@Test
public void testAppendLarge() throws Exception {
public void testAppendLargeCompressed() throws Exception {
testAppendLarge(CompressionType.GZIP);
}
@Test
public void testAppendLargeNonCompressed() throws Exception {
testAppendLarge(CompressionType.NONE);
}
private void testAppendLarge(CompressionType compressionType) throws Exception {
int batchSize = 512;
byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
compressionType, 0L, 100L, metrics, time, new ApiVersions(), null);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
Deque<ProducerBatch> batches = accum.batches().get(tp1);
assertEquals(1, batches.size());
ProducerBatch producerBatch = batches.peek();
List<MutableRecordBatch> recordBatches = TestUtils.toList(producerBatch.records().batches());
assertEquals(1, recordBatches.size());
MutableRecordBatch recordBatch = recordBatches.get(0);
assertEquals(0L, recordBatch.baseOffset());
List<Record> records = TestUtils.toList(recordBatch);
assertEquals(1, records.size());
Record record = records.get(0);
assertEquals(0L, record.offset());
assertEquals(ByteBuffer.wrap(key), record.key());
assertEquals(ByteBuffer.wrap(value), record.value());
assertEquals(0L, record.timestamp());
}
@Test
public void testAppendLargeOldMessageFormatCompressed() throws Exception {
testAppendLargeOldMessageFormat(CompressionType.GZIP);
}
@Test
public void testAppendLargeOldMessageFormatNonCompressed() throws Exception {
testAppendLargeOldMessageFormat(CompressionType.NONE);
}
private void testAppendLargeOldMessageFormat(CompressionType compressionType) throws Exception {
int batchSize = 512;
byte[] value = new byte[2 * batchSize];
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(node1.idString(), NodeApiVersions.create(Collections.singleton(
new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
compressionType, 0L, 100L, metrics, time, apiVersions, null);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);

8
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.jmh.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
@ -59,7 +58,7 @@ public class RecordBatchIterationBenchmark { @@ -59,7 +58,7 @@ public class RecordBatchIterationBenchmark {
}
@Param(value = {"LZ4", "SNAPPY", "NONE"})
private CompressionType type = CompressionType.NONE;
private CompressionType compressionType = CompressionType.NONE;
@Param(value = {"1", "2"})
private byte messageVersion = CURRENT_MAGIC_VALUE;
@ -98,11 +97,12 @@ public class RecordBatchIterationBenchmark { @@ -98,11 +97,12 @@ public class RecordBatchIterationBenchmark {
private ByteBuffer createBatch(int batchSize) {
byte[] value = new byte[messageSize];
final ByteBuffer buf = ByteBuffer.allocate(
AbstractRecords.sizeInBytesUpperBound(messageVersion, new byte[0], value, new Header[0]) * batchSize
AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType, new byte[0], value,
Record.EMPTY_HEADERS) * batchSize
);
final MemoryRecordsBuilder builder =
MemoryRecords.builder(buf, messageVersion, type, TimestampType.CREATE_TIME, startingOffset);
MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset);
for (int i = 0; i < batchSize; ++i) {
switch (bytes) {

Loading…
Cancel
Save