From 8b84d14c6fb338e9e6a6d41fe0a6298a63cfc5c0 Mon Sep 17 00:00:00 2001 From: Kamil Szymanski Date: Tue, 20 Dec 2016 00:30:36 +0000 Subject: [PATCH] MINOR: Fix exception handling in case of file record truncation during write In case of file record truncation during write due to improper types usage (`AtomicInteger` in place of `int`) `IllegalFormatConversionException` would be thrown instead of `KafkaException` Author: Kamil Szymanski Reviewers: Ismael Juma Closes #2275 from kamilszymanski/file_record_truncation_during_write --- .../java/org/apache/kafka/common/record/FileRecords.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 52f31038a47..050711d5317 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -233,11 +233,14 @@ public class FileRecords extends AbstractRecords implements Closeable { @Override public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException { long newSize = Math.min(channel.size(), end) - start; - if (newSize < size.get()) - throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize)); + int oldSize = sizeInBytes(); + if (newSize < oldSize) + throw new KafkaException(String.format( + "Size of FileRecords %s has been truncated during write: old size %d, new size %d", + file.getAbsolutePath(), oldSize, newSize)); long position = start + offset; - long count = Math.min(length, size.get()); + int count = Math.min(length, oldSize); final long bytesTransferred; if (destChannel instanceof TransportLayer) { TransportLayer tl = (TransportLayer) destChannel;