Browse Source

MINOR: Ensure exception messages include partition/segment info when possible (#4907)

Reviewers: Anna Povzner <anna@confluent.io>, Ismael Juma <ismael@juma.me.uk>
pull/4920/head
Jason Gustafson 7 years ago committed by GitHub
parent
commit
f467c9c243
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
  2. 12
      clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
  3. 16
      clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
  4. 15
      clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
  5. 30
      core/src/main/scala/kafka/log/Log.scala
  6. 8
      core/src/main/scala/kafka/log/LogCleaner.scala
  7. 2
      core/src/main/scala/kafka/log/LogSegment.scala

6
clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java

@ -46,9 +46,11 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> { @@ -46,9 +46,11 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
// V0 has the smallest overhead, stricter checking is done later
if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
throw new CorruptRecordException(String.format("Record size %d is less than the minimum record overhead (%d)",
recordSize, LegacyRecord.RECORD_OVERHEAD_V0));
if (recordSize > maxMessageSize)
throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
throw new CorruptRecordException(String.format("Record size %d exceeds the largest allowable message size (%d).",
recordSize, maxMessageSize));
int batchSize = recordSize + LOG_OVERHEAD;
if (remaining < batchSize)

12
clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java

@ -40,25 +40,26 @@ import static org.apache.kafka.common.record.Records.SIZE_OFFSET; @@ -40,25 +40,26 @@ import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {
private int position;
private final int end;
private final FileChannel channel;
private final FileRecords fileRecords;
private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
/**
* Create a new log input stream over the FileChannel
* @param channel Underlying FileChannel
* @param records Underlying FileRecords instance
* @param start Position in the file channel to start from
* @param end Position in the file channel not to read past
*/
FileLogInputStream(FileChannel channel,
FileLogInputStream(FileRecords records,
int start,
int end) {
this.channel = channel;
this.fileRecords = records;
this.position = start;
this.end = end;
}
@Override
public FileChannelRecordBatch nextBatch() throws IOException {
FileChannel channel = fileRecords.channel();
if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
return null;
@ -71,7 +72,8 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil @@ -71,7 +72,8 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
// V0 has the smallest overhead, stricter checking is done later
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
"overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));
if (position + LOG_OVERHEAD + size > end)
return null;

16
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java

@ -132,9 +132,9 @@ public class FileRecords extends AbstractRecords implements Closeable { @@ -132,9 +132,9 @@ public class FileRecords extends AbstractRecords implements Closeable {
*/
public FileRecords read(int position, int size) throws IOException {
if (position < 0)
throw new IllegalArgumentException("Invalid position: " + position);
throw new IllegalArgumentException("Invalid position: " + position + " in read from " + file);
if (size < 0)
throw new IllegalArgumentException("Invalid size: " + size);
throw new IllegalArgumentException("Invalid size: " + size + " in read from " + file);
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
@ -228,7 +228,7 @@ public class FileRecords extends AbstractRecords implements Closeable { @@ -228,7 +228,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
public int truncateTo(int targetSize) throws IOException {
int originalSize = sizeInBytes();
if (targetSize > originalSize || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.");
if (targetSize < (int) channel.size()) {
channel.truncate(targetSize);
@ -347,6 +347,14 @@ public class FileRecords extends AbstractRecords implements Closeable { @@ -347,6 +347,14 @@ public class FileRecords extends AbstractRecords implements Closeable {
return batches;
}
@Override
public String toString() {
return "FileRecords(file= " + file +
", start=" + start +
", end=" + end +
")";
}
private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
return new Iterable<FileChannelRecordBatch>() {
@Override
@ -362,7 +370,7 @@ public class FileRecords extends AbstractRecords implements Closeable { @@ -362,7 +370,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
end = this.end;
else
end = this.sizeInBytes();
FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
FileLogInputStream inputStream = new FileLogInputStream(this, start, end);
return new RecordBatchIterator<>(inputStream);
}

15
clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java

@ -60,8 +60,7 @@ public class FileLogInputStreamTest { @@ -60,8 +60,7 @@ public class FileLogInputStreamTest {
fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes())));
fileRecords.flush();
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch batch = logInputStream.nextBatch();
assertNotNull(batch);
@ -90,8 +89,7 @@ public class FileLogInputStreamTest { @@ -90,8 +89,7 @@ public class FileLogInputStreamTest {
fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
fileRecords.flush();
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecord);
@ -126,8 +124,7 @@ public class FileLogInputStreamTest { @@ -126,8 +124,7 @@ public class FileLogInputStreamTest {
fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords));
fileRecords.flush();
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertNoProducerData(firstBatch);
@ -169,8 +166,7 @@ public class FileLogInputStreamTest { @@ -169,8 +166,7 @@ public class FileLogInputStreamTest {
producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords));
fileRecords.flush();
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords);
@ -198,8 +194,7 @@ public class FileLogInputStreamTest { @@ -198,8 +194,7 @@ public class FileLogInputStreamTest {
fileRecords.flush();
fileRecords.truncateTo(fileRecords.sizeInBytes() - 13);
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertNoProducerData(firstBatch);

30
core/src/main/scala/kafka/log/Log.scala

@ -686,7 +686,8 @@ class Log(@volatile var dir: File, @@ -686,7 +686,8 @@ class Log(@volatile var dir: File,
leaderEpoch,
isFromClient)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
@ -705,15 +706,16 @@ class Log(@volatile var dir: File, @@ -705,15 +706,16 @@ class Log(@volatile var dir: File,
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
.format(batch.sizeInBytes, config.maxMessageSize))
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " +
records.records.asScala.map(_.offset))
}
// update the epoch cache with the epoch stamped onto the message by the leader
@ -724,8 +726,8 @@ class Log(@volatile var dir: File, @@ -724,8 +726,8 @@ class Log(@volatile var dir: File,
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validRecords.sizeInBytes, config.segmentSize))
throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// now that we have valid records, offsets assigned, and timestamps updated, we need to
@ -887,7 +889,8 @@ class Log(@volatile var dir: File, @@ -887,7 +889,8 @@ class Log(@volatile var dir: File,
for (batch <- records.batches.asScala) {
// we only validate V2 and higher to avoid potential compatibility issues with older clients
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
throw new InvalidRecordException(s"The baseOffset of the record batch should be 0, but it is ${batch.baseOffset}")
throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
s"be 0, but it is ${batch.baseOffset}")
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
@ -913,8 +916,8 @@ class Log(@volatile var dir: File, @@ -913,8 +916,8 @@ class Log(@volatile var dir: File,
if (batchSize > config.maxMessageSize) {
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
s"value of ${config.maxMessageSize}.")
throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
}
// check the validity of the message by checking CRC
@ -957,7 +960,8 @@ class Log(@volatile var dir: File, @@ -957,7 +960,8 @@ class Log(@volatile var dir: File,
private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
val validBytes = info.validBytes
if (validBytes < 0)
throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " +
s"log for $topicPartition. A possible cause is a corrupted produce request.")
if (validBytes == records.sizeInBytes) {
records
} else {
@ -1011,7 +1015,8 @@ class Log(@volatile var dir: File, @@ -1011,7 +1015,8 @@ class Log(@volatile var dir: File,
// return error on attempt to read beyond the log end offset or read below log start offset
if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments in the range $logStartOffset to $next.")
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
@ -1375,7 +1380,8 @@ class Log(@volatile var dir: File, @@ -1375,7 +1380,8 @@ class Log(@volatile var dir: File,
preallocate = config.preallocate)
val prev = addSegment(segment)
if (prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with " +
s"start offset $newOffset while it already exists.")
// We need to update the segment base offset and append position data of the metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(nextOffsetMetadata.messageOffset)

8
core/src/main/scala/kafka/log/LogCleaner.scala

@ -800,7 +800,13 @@ private[log] class Cleaner(val id: Int, @@ -800,7 +800,13 @@ private[log] class Cleaner(val id: Int,
while (position < segment.log.sizeInBytes) {
checkDone(topicPartition)
readBuffer.clear()
segment.log.readInto(readBuffer, position)
try {
segment.log.readInto(readBuffer, position)
} catch {
case e: Exception =>
throw new KafkaException(s"Failed to read from segment $segment of partition $topicPartition " +
"while loading offset map", e)
}
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)

2
core/src/main/scala/kafka/log/LogSegment.scala

@ -205,7 +205,7 @@ class LogSegment private[log] (val log: FileRecords, @@ -205,7 +205,7 @@ class LogSegment private[log] (val log: FileRecords,
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startOffsetAndSize = translateOffset(startOffset)

Loading…
Cancel
Save