From 5cddf9860bfb6271dae3a5286051a23a12c13dfe Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 3 Dec 2019 08:26:17 -0800 Subject: [PATCH] KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769) This reverts commit 90043d5f as it caused a regression in some cases: > Caused by: java.io.IOException: Stream frame descriptor corrupted > at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) > at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.(KafkaLZ4BlockInputStream.java:78) > at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) I will investigate why after, but I want to get the safe fix into 2.4.0. The reporter of KAFKA-9203 has verified that reverting this change makes the problem go away. Reviewers: Manikumar Reddy --- .../record/KafkaLZ4BlockInputStream.java | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java index 9a378333e64..850b1e96e55 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java @@ -77,6 +77,11 @@ public final class KafkaLZ4BlockInputStream extends InputStream { this.bufferSupplier = bufferSupplier; readHeader(); decompressionBuffer = bufferSupplier.get(maxBlockSize); + if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) { + // require array backed decompression buffer with zero offset + // to simplify workaround for https://github.com/lz4/lz4-java/pull/65 + throw new RuntimeException("decompression buffer must have backing array with zero array offset"); + } finished = false; } @@ -126,7 +131,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream { int len = in.position() - in.reset().position(); - int hash = CHECKSUM.hash(in, in.position(), len, 0); + int hash = in.hasArray() ? + // workaround for https://github.com/lz4/lz4-java/pull/65 + CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) : + CHECKSUM.hash(in, in.position(), len, 0); in.position(in.position() + len); if (in.get() != (byte) ((hash >> 8) & 0xFF)) { throw new IOException(DESCRIPTOR_HASH_MISMATCH); @@ -164,8 +172,22 @@ public final class KafkaLZ4BlockInputStream extends InputStream { if (compressed) { try { - final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, - maxBlockSize); + // workaround for https://github.com/lz4/lz4-java/pull/65 + final int bufferSize; + if (in.hasArray()) { + bufferSize = DECOMPRESSOR.decompress( + in.array(), + in.position() + in.arrayOffset(), + blockSize, + decompressionBuffer.array(), + 0, + maxBlockSize + ); + } else { + // decompressionBuffer has zero arrayOffset, so we don't need to worry about + // https://github.com/lz4/lz4-java/pull/65 + bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize); + } decompressionBuffer.position(0); decompressionBuffer.limit(bufferSize); decompressedBuffer = decompressionBuffer; @@ -179,7 +201,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream { // verify checksum if (flg.isBlockChecksumSet()) { - int hash = CHECKSUM.hash(in, in.position(), blockSize, 0); + // workaround for https://github.com/lz4/lz4-java/pull/65 + int hash = in.hasArray() ? + CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) : + CHECKSUM.hash(in, in.position(), blockSize, 0); in.position(in.position() + blockSize); if (hash != in.getInt()) { throw new IOException(BLOCK_HASH_MISMATCH);