Browse Source

KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)

This reverts commit 90043d5f as it caused a regression in some cases:

> Caused by: java.io.IOException: Stream frame descriptor corrupted
>         at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
>         at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
>         at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)

I will investigate why after, but I want to get the safe fix into 2.4.0.
The reporter of KAFKA-9203 has verified that reverting this change
makes the problem go away.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
pull/7765/head
Ismael Juma 5 years ago committed by GitHub
parent
commit
5cddf9860b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java

33
clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java

@ -77,6 +77,11 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
this.bufferSupplier = bufferSupplier; this.bufferSupplier = bufferSupplier;
readHeader(); readHeader();
decompressionBuffer = bufferSupplier.get(maxBlockSize); decompressionBuffer = bufferSupplier.get(maxBlockSize);
if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
// require array backed decompression buffer with zero offset
// to simplify workaround for https://github.com/lz4/lz4-java/pull/65
throw new RuntimeException("decompression buffer must have backing array with zero array offset");
}
finished = false; finished = false;
} }
@ -126,7 +131,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
int len = in.position() - in.reset().position(); int len = in.position() - in.reset().position();
int hash = CHECKSUM.hash(in, in.position(), len, 0); int hash = in.hasArray() ?
// workaround for https://github.com/lz4/lz4-java/pull/65
CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
CHECKSUM.hash(in, in.position(), len, 0);
in.position(in.position() + len); in.position(in.position() + len);
if (in.get() != (byte) ((hash >> 8) & 0xFF)) { if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH); throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@ -164,8 +172,22 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
if (compressed) { if (compressed) {
try { try {
final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, // workaround for https://github.com/lz4/lz4-java/pull/65
maxBlockSize); final int bufferSize;
if (in.hasArray()) {
bufferSize = DECOMPRESSOR.decompress(
in.array(),
in.position() + in.arrayOffset(),
blockSize,
decompressionBuffer.array(),
0,
maxBlockSize
);
} else {
// decompressionBuffer has zero arrayOffset, so we don't need to worry about
// https://github.com/lz4/lz4-java/pull/65
bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
}
decompressionBuffer.position(0); decompressionBuffer.position(0);
decompressionBuffer.limit(bufferSize); decompressionBuffer.limit(bufferSize);
decompressedBuffer = decompressionBuffer; decompressedBuffer = decompressionBuffer;
@ -179,7 +201,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
// verify checksum // verify checksum
if (flg.isBlockChecksumSet()) { if (flg.isBlockChecksumSet()) {
int hash = CHECKSUM.hash(in, in.position(), blockSize, 0); // workaround for https://github.com/lz4/lz4-java/pull/65
int hash = in.hasArray() ?
CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
CHECKSUM.hash(in, in.position(), blockSize, 0);
in.position(in.position() + blockSize); in.position(in.position() + blockSize);
if (hash != in.getInt()) { if (hash != in.getInt()) {
throw new IOException(BLOCK_HASH_MISMATCH); throw new IOException(BLOCK_HASH_MISMATCH);

Loading…
Cancel
Save