diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java index 8cfc37be826..591ab169364 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -34,7 +33,7 @@ import net.jpountz.xxhash.XXHashFactory; * * This class is not thread-safe. */ -public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { +public final class KafkaLZ4BlockOutputStream extends OutputStream { public static final int MAGIC = 0x184D2204; public static final int LZ4_MAX_HEADER_LENGTH = 19; @@ -52,9 +51,10 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { private final boolean useBrokenFlagDescriptorChecksum; private final FLG flg; private final BD bd; - private final byte[] buffer; - private final byte[] compressedBuffer; private final int maxBlockSize; + private OutputStream out; + private byte[] buffer; + private byte[] compressedBuffer; private int bufferOffset; private boolean finished; @@ -71,7 +71,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { * @throws IOException */ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException { - super(out); + this.out = out; compressor = LZ4Factory.fastestInstance().fastCompressor(); checksum = XXHashFactory.fastestInstance().hash32(); this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; @@ -204,7 +204,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { private void writeEndMark() throws IOException { ByteUtils.writeUnsignedIntLE(out, 0); // TODO implement content checksum, update flg.validate() - finished = true; } @Override @@ -259,15 +258,26 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @Override public void close() throws IOException { - if (!finished) { - // basically flush the buffer writing the last block - writeBlock(); - // write the end block and finish the stream - writeEndMark(); - } - if (out != null) { - out.close(); - out = null; + try { + if (!finished) { + // basically flush the buffer writing the last block + writeBlock(); + // write the end block + writeEndMark(); + } + } finally { + try { + if (out != null) { + try (OutputStream outStream = out) { + outStream.flush(); + } + } + } finally { + out = null; + buffer = null; + compressedBuffer = null; + finished = true; + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a9b57ac22df..6f6404fa2d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import static org.apache.kafka.common.utils.Utils.wrapNullable; @@ -38,11 +39,15 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; */ public class MemoryRecordsBuilder { private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; + private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() { + @Override + public void write(int b) throws IOException { + throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends"); + } + }); private final TimestampType timestampType; private final CompressionType compressionType; - // Used to append records, may compress data on the fly - private final DataOutputStream appendStream; // Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough, // so it's not safe to hold a direct reference to the underlying ByteBuffer. @@ -60,7 +65,8 @@ public class MemoryRecordsBuilder { // from previous batches before appending any records. private float estimatedCompressionRatio = 1.0F; - private boolean appendStreamIsClosed = false; + // Used to append records, may compress data on the fly + private DataOutputStream appendStream; private boolean isTransactional; private long producerId; private short producerEpoch; @@ -265,12 +271,13 @@ public class MemoryRecordsBuilder { * possible to update the RecordBatch header. */ public void closeForRecordAppends() { - if (!appendStreamIsClosed) { + if (appendStream != CLOSED_STREAM) { try { appendStream.close(); - appendStreamIsClosed = true; } catch (IOException e) { throw new KafkaException(e); + } finally { + appendStream = CLOSED_STREAM; } } } @@ -663,7 +670,7 @@ public class MemoryRecordsBuilder { } private void ensureOpenForRecordAppend() { - if (appendStreamIsClosed) + if (appendStream == CLOSED_STREAM) throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends"); } @@ -738,7 +745,7 @@ public class MemoryRecordsBuilder { public boolean isFull() { // note that the write limit is respected only after the first record is added which ensures we can always // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0). - return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); + return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index c713d17f615..a90fb299c52 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -644,6 +645,39 @@ public class MemoryRecordsBuilderTest { return values; } + @Test + public void testBuffersDereferencedOnClose() { + Runtime runtime = Runtime.getRuntime(); + int payloadLen = 1024 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2); + byte[] key = new byte[0]; + byte[] value = new byte[payloadLen]; + new Random().nextBytes(value); // Use random payload so that compressed buffer is large + List builders = new ArrayList<>(100); + long startMem = 0; + long memUsed = 0; + int iterations = 0; + while (iterations++ < 100) { + buffer.rewind(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, 0); + builder.append(1L, new byte[0], value); + builder.build(); + builders.add(builder); + + System.gc(); + memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem; + // Ignore memory usage during initialization + if (iterations == 2) + startMem = memUsed; + else if (iterations > 2 && memUsed < (iterations - 2) * 1024) + break; + } + assertTrue("Memory usage too high: " + memUsed, iterations < 100); + } + private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords, int numRecordsConverted, long finalBytes, long preConvertedBytes) { assertNotNull("Records processing info is null", processingStats);