From 1067cc3422e6f2731ae57df9f01e84f2242a60be Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 15 Feb 2018 17:36:44 +0000 Subject: [PATCH] KAFKA-6512: Discard references to buffers used for compression (#4570) ProducerBatch retains references to MemoryRecordsBuilder and cannot be freed until acks are received. Removing references to buffers used for compression after records are built will enable these to be garbage collected sooner, reducing the risk of OOM. Reviewers: Ismael Juma , Jason Gustafson , Lothsahn --- .../record/KafkaLZ4BlockOutputStream.java | 40 ++++++++++++------- .../common/record/MemoryRecordsBuilder.java | 21 ++++++---- .../record/MemoryRecordsBuilderTest.java | 34 ++++++++++++++++ 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java index 8cfc37be826..591ab169364 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -34,7 +33,7 @@ import net.jpountz.xxhash.XXHashFactory; * * This class is not thread-safe. */ -public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { +public final class KafkaLZ4BlockOutputStream extends OutputStream { public static final int MAGIC = 0x184D2204; public static final int LZ4_MAX_HEADER_LENGTH = 19; @@ -52,9 +51,10 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { private final boolean useBrokenFlagDescriptorChecksum; private final FLG flg; private final BD bd; - private final byte[] buffer; - private final byte[] compressedBuffer; private final int maxBlockSize; + private OutputStream out; + private byte[] buffer; + private byte[] compressedBuffer; private int bufferOffset; private boolean finished; @@ -71,7 +71,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { * @throws IOException */ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException { - super(out); + this.out = out; compressor = LZ4Factory.fastestInstance().fastCompressor(); checksum = XXHashFactory.fastestInstance().hash32(); this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; @@ -204,7 +204,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { private void writeEndMark() throws IOException { ByteUtils.writeUnsignedIntLE(out, 0); // TODO implement content checksum, update flg.validate() - finished = true; } @Override @@ -259,15 +258,26 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @Override public void close() throws IOException { - if (!finished) { - // basically flush the buffer writing the last block - writeBlock(); - // write the end block and finish the stream - writeEndMark(); - } - if (out != null) { - out.close(); - out = null; + try { + if (!finished) { + // basically flush the buffer writing the last block + writeBlock(); + // write the end block + writeEndMark(); + } + } finally { + try { + if (out != null) { + try (OutputStream outStream = out) { + outStream.flush(); + } + } + } finally { + out = null; + buffer = null; + compressedBuffer = null; + finished = true; + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a9b57ac22df..6f6404fa2d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import static org.apache.kafka.common.utils.Utils.wrapNullable; @@ -38,11 +39,15 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; */ public class MemoryRecordsBuilder { private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; + private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() { + @Override + public void write(int b) throws IOException { + throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends"); + } + }); private final TimestampType timestampType; private final CompressionType compressionType; - // Used to append records, may compress data on the fly - private final DataOutputStream appendStream; // Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough, // so it's not safe to hold a direct reference to the underlying ByteBuffer. @@ -60,7 +65,8 @@ public class MemoryRecordsBuilder { // from previous batches before appending any records. private float estimatedCompressionRatio = 1.0F; - private boolean appendStreamIsClosed = false; + // Used to append records, may compress data on the fly + private DataOutputStream appendStream; private boolean isTransactional; private long producerId; private short producerEpoch; @@ -265,12 +271,13 @@ public class MemoryRecordsBuilder { * possible to update the RecordBatch header. */ public void closeForRecordAppends() { - if (!appendStreamIsClosed) { + if (appendStream != CLOSED_STREAM) { try { appendStream.close(); - appendStreamIsClosed = true; } catch (IOException e) { throw new KafkaException(e); + } finally { + appendStream = CLOSED_STREAM; } } } @@ -663,7 +670,7 @@ public class MemoryRecordsBuilder { } private void ensureOpenForRecordAppend() { - if (appendStreamIsClosed) + if (appendStream == CLOSED_STREAM) throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends"); } @@ -738,7 +745,7 @@ public class MemoryRecordsBuilder { public boolean isFull() { // note that the write limit is respected only after the first record is added which ensures we can always // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0). - return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); + return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index c713d17f615..a90fb299c52 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -644,6 +645,39 @@ public class MemoryRecordsBuilderTest { return values; } + @Test + public void testBuffersDereferencedOnClose() { + Runtime runtime = Runtime.getRuntime(); + int payloadLen = 1024 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2); + byte[] key = new byte[0]; + byte[] value = new byte[payloadLen]; + new Random().nextBytes(value); // Use random payload so that compressed buffer is large + List builders = new ArrayList<>(100); + long startMem = 0; + long memUsed = 0; + int iterations = 0; + while (iterations++ < 100) { + buffer.rewind(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, 0); + builder.append(1L, new byte[0], value); + builder.build(); + builders.add(builder); + + System.gc(); + memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem; + // Ignore memory usage during initialization + if (iterations == 2) + startMem = memUsed; + else if (iterations > 2 && memUsed < (iterations - 2) * 1024) + break; + } + assertTrue("Memory usage too high: " + memUsed, iterations < 100); + } + private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords, int numRecordsConverted, long finalBytes, long preConvertedBytes) { assertNotNull("Records processing info is null", processingStats);