Browse Source

MINOR: use addExact to avoid overflow and some cleanup (#12660)

What changes in this PR:
1. Use addExact to avoid overflow in BatchAccumulator#bytesNeeded. We did use addExact in bytesNeededForRecords method, but forgot that when returning the result.
2. javadoc improvement

Reviewers: Jason Gustafson <jason@confluent.io>
pull/12527/head
Luke Chen 2 years ago committed by GitHub
parent
commit
bf7ddf73af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
  2. 4
      raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
  3. 8
      raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java

4
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java

@ -107,6 +107,7 @@ public class BatchAccumulator<T> implements Closeable { @@ -107,6 +107,7 @@ public class BatchAccumulator<T> implements Closeable {
* @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for the records
* @throws IllegalStateException if we tried to append new records after the batch has been built
*/
public long append(int epoch, List<T> records) {
return append(epoch, records, false);
@ -127,6 +128,7 @@ public class BatchAccumulator<T> implements Closeable { @@ -127,6 +128,7 @@ public class BatchAccumulator<T> implements Closeable {
* @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for the records
* @throws IllegalStateException if we tried to append new records after the batch has been built
*/
public long appendAtomic(int epoch, List<T> records) {
return append(epoch, records, true);
@ -260,7 +262,7 @@ public class BatchAccumulator<T> implements Closeable { @@ -260,7 +262,7 @@ public class BatchAccumulator<T> implements Closeable {
/**
* Append a {@link LeaderChangeMessage} record to the batch
*
* @param LeaderChangeMessage The message to append
* @param leaderChangeMessage The message to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/

4
raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java

@ -142,7 +142,7 @@ public class BatchBuilder<T> { @@ -142,7 +142,7 @@ public class BatchBuilder<T> {
);
if (!isOpenForAppends) {
return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(), bytesNeeded));
}
int approxUnusedSizeInBytes = maxBytes - approximateSizeInBytes();
@ -157,7 +157,7 @@ public class BatchBuilder<T> { @@ -157,7 +157,7 @@ public class BatchBuilder<T> {
}
}
return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(), bytesNeeded));
}
private int flushedSizeInBytes() {

8
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java

@ -54,12 +54,12 @@ public class BatchMemoryPool implements MemoryPool { @@ -54,12 +54,12 @@ public class BatchMemoryPool implements MemoryPool {
}
/**
* Allocate a byte buffer in this pool.
* Allocate a byte buffer with {@code batchSize} in this pool.
*
* This method should always succeed and never return null. The sizeBytes parameter must be less than
* the batchSize used in the constructor.
*
* @param sizeBytes is not used to determine the size of the byte buffer
* @param sizeBytes is used to determine if the requested size is exceeding the batchSize
* @throws IllegalArgumentException if sizeBytes is greater than batchSize
*/
@Override
@ -96,9 +96,9 @@ public class BatchMemoryPool implements MemoryPool { @@ -96,9 +96,9 @@ public class BatchMemoryPool implements MemoryPool {
try {
previouslyAllocated.clear();
if (previouslyAllocated.limit() != batchSize) {
if (previouslyAllocated.capacity() != batchSize) {
throw new IllegalArgumentException("Released buffer with unexpected size "
+ previouslyAllocated.limit());
+ previouslyAllocated.capacity());
}
// Free the buffer if the number of pooled buffers is already the maximum number of batches.

Loading…
Cancel
Save