Browse Source

MINOR: Fix exception handling in case of file record truncation during write

In case of file record truncation during write due to improper types usage
(`AtomicInteger` in place of `int`) `IllegalFormatConversionException` would
be thrown instead of `KafkaException`

Author: Kamil Szymanski <kamil.szymanski.dev@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2275 from kamilszymanski/file_record_truncation_during_write
pull/2261/merge
Kamil Szymanski 8 years ago committed by Ismael Juma
parent
commit
8b84d14c6f
  1. 9
      clients/src/main/java/org/apache/kafka/common/record/FileRecords.java

9
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java

@ -233,11 +233,14 @@ public class FileRecords extends AbstractRecords implements Closeable { @@ -233,11 +233,14 @@ public class FileRecords extends AbstractRecords implements Closeable {
@Override
public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
if (newSize < size.get())
throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize));
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
long count = Math.min(length, size.get());
int count = Math.min(length, oldSize);
final long bytesTransferred;
if (destChannel instanceof TransportLayer) {
TransportLayer tl = (TransportLayer) destChannel;

Loading…
Cancel
Save