Browse Source

KAFKA-6512: Discard references to buffers used for compression (#4570)

ProducerBatch retains references to MemoryRecordsBuilder and cannot be freed until acks are received. Removing references to buffers used for compression after records are built will enable these to be garbage collected sooner, reducing the risk of OOM.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Lothsahn <Lothsahn@gmail.com>
pull/4565/merge
Rajini Sivaram 7 years ago committed by GitHub
parent
commit
1067cc3422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 40
      clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
  2. 21
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
  3. 34
      clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java

40
clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.record;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@ -34,7 +33,7 @@ import net.jpountz.xxhash.XXHashFactory; @@ -34,7 +33,7 @@ import net.jpountz.xxhash.XXHashFactory;
*
* This class is not thread-safe.
*/
public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
public final class KafkaLZ4BlockOutputStream extends OutputStream {
public static final int MAGIC = 0x184D2204;
public static final int LZ4_MAX_HEADER_LENGTH = 19;
@ -52,9 +51,10 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @@ -52,9 +51,10 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
private final boolean useBrokenFlagDescriptorChecksum;
private final FLG flg;
private final BD bd;
private final byte[] buffer;
private final byte[] compressedBuffer;
private final int maxBlockSize;
private OutputStream out;
private byte[] buffer;
private byte[] compressedBuffer;
private int bufferOffset;
private boolean finished;
@ -71,7 +71,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @@ -71,7 +71,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* @throws IOException
*/
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
super(out);
this.out = out;
compressor = LZ4Factory.fastestInstance().fastCompressor();
checksum = XXHashFactory.fastestInstance().hash32();
this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
@ -204,7 +204,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @@ -204,7 +204,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
private void writeEndMark() throws IOException {
ByteUtils.writeUnsignedIntLE(out, 0);
// TODO implement content checksum, update flg.validate()
finished = true;
}
@Override
@ -259,15 +258,26 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @@ -259,15 +258,26 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
@Override
public void close() throws IOException {
if (!finished) {
// basically flush the buffer writing the last block
writeBlock();
// write the end block and finish the stream
writeEndMark();
}
if (out != null) {
out.close();
out = null;
try {
if (!finished) {
// basically flush the buffer writing the last block
writeBlock();
// write the end block
writeEndMark();
}
} finally {
try {
if (out != null) {
try (OutputStream outStream = out) {
outStream.flush();
}
}
} finally {
out = null;
buffer = null;
compressedBuffer = null;
finished = true;
}
}
}

21
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream; @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.utils.Utils.wrapNullable;
@ -38,11 +39,15 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; @@ -38,11 +39,15 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
*/
public class MemoryRecordsBuilder {
private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
@Override
public void write(int b) throws IOException {
throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends");
}
});
private final TimestampType timestampType;
private final CompressionType compressionType;
// Used to append records, may compress data on the fly
private final DataOutputStream appendStream;
// Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access
// the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough,
// so it's not safe to hold a direct reference to the underlying ByteBuffer.
@ -60,7 +65,8 @@ public class MemoryRecordsBuilder { @@ -60,7 +65,8 @@ public class MemoryRecordsBuilder {
// from previous batches before appending any records.
private float estimatedCompressionRatio = 1.0F;
private boolean appendStreamIsClosed = false;
// Used to append records, may compress data on the fly
private DataOutputStream appendStream;
private boolean isTransactional;
private long producerId;
private short producerEpoch;
@ -265,12 +271,13 @@ public class MemoryRecordsBuilder { @@ -265,12 +271,13 @@ public class MemoryRecordsBuilder {
* possible to update the RecordBatch header.
*/
public void closeForRecordAppends() {
if (!appendStreamIsClosed) {
if (appendStream != CLOSED_STREAM) {
try {
appendStream.close();
appendStreamIsClosed = true;
} catch (IOException e) {
throw new KafkaException(e);
} finally {
appendStream = CLOSED_STREAM;
}
}
}
@ -663,7 +670,7 @@ public class MemoryRecordsBuilder { @@ -663,7 +670,7 @@ public class MemoryRecordsBuilder {
}
private void ensureOpenForRecordAppend() {
if (appendStreamIsClosed)
if (appendStream == CLOSED_STREAM)
throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends");
}
@ -738,7 +745,7 @@ public class MemoryRecordsBuilder { @@ -738,7 +745,7 @@ public class MemoryRecordsBuilder {
public boolean isFull() {
// note that the write limit is respected only after the first record is added which ensures we can always
// create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}
/**

34
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java

@ -28,6 +28,7 @@ import java.util.ArrayList; @@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -644,6 +645,39 @@ public class MemoryRecordsBuilderTest { @@ -644,6 +645,39 @@ public class MemoryRecordsBuilderTest {
return values;
}
@Test
public void testBuffersDereferencedOnClose() {
Runtime runtime = Runtime.getRuntime();
int payloadLen = 1024 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
byte[] key = new byte[0];
byte[] value = new byte[payloadLen];
new Random().nextBytes(value); // Use random payload so that compressed buffer is large
List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
long startMem = 0;
long memUsed = 0;
int iterations = 0;
while (iterations++ < 100) {
buffer.rewind();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
builder.append(1L, new byte[0], value);
builder.build();
builders.add(builder);
System.gc();
memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
// Ignore memory usage during initialization
if (iterations == 2)
startMem = memUsed;
else if (iterations > 2 && memUsed < (iterations - 2) * 1024)
break;
}
assertTrue("Memory usage too high: " + memUsed, iterations < 100);
}
private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords,
int numRecordsConverted, long finalBytes, long preConvertedBytes) {
assertNotNull("Records processing info is null", processingStats);

Loading…
Cancel
Save