From 0d0a0a2d0815caa7b27c23ce15bd98627da05a8a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 2 Aug 2018 14:08:12 +0300 Subject: [PATCH] Polish --- .../core/io/buffer/DataBufferUtils.java | 58 ++++++++----------- .../core/io/buffer/DataBufferUtilsTests.java | 6 +- 2 files changed, 26 insertions(+), 38 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index a790dce793..d2e0d57a07 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -393,24 +393,16 @@ public abstract class DataBufferUtils { public static Flux takeUntilByteCount(Publisher publisher, long maxByteCount) { Assert.notNull(publisher, "Publisher must not be null"); Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); - AtomicLong byteCountDown = new AtomicLong(maxByteCount); - - return Flux.from(publisher). - takeWhile(dataBuffer -> { - int delta = -dataBuffer.readableByteCount(); - long currentCount = byteCountDown.getAndAdd(delta); - return currentCount >= 0; - }). - map(dataBuffer -> { - long currentCount = byteCountDown.get(); - if (currentCount >= 0) { - return dataBuffer; - } - else { - // last buffer - int size = (int) (currentCount + dataBuffer.readableByteCount()); - return dataBuffer.slice(0, size); - } + AtomicLong countDown = new AtomicLong(maxByteCount); + + return Flux.from(publisher) + .takeWhile(buffer -> { + int delta = -buffer.readableByteCount(); + return countDown.getAndAdd(delta) >= 0; + }) + .map(buffer -> { + long count = countDown.get(); + return count >= 0 ? buffer : buffer.slice(0, buffer.readableByteCount() + (int) count); }); } @@ -427,27 +419,23 @@ public abstract class DataBufferUtils { Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); AtomicLong byteCountDown = new AtomicLong(maxByteCount); - return Flux.from(publisher). - skipUntil(dataBuffer -> { - int delta = -dataBuffer.readableByteCount(); - long currentCount = byteCountDown.addAndGet(delta); - if (currentCount < 0) { - return true; - } - else { - DataBufferUtils.release(dataBuffer); + return Flux.from(publisher) + .skipUntil(buffer -> { + int delta = -buffer.readableByteCount(); + if (byteCountDown.addAndGet(delta) >= 0) { + DataBufferUtils.release(buffer); return false; } - }). - map(dataBuffer -> { - long currentCount = byteCountDown.get(); - // slice first buffer, then let others flow through - if (currentCount < 0) { - int skip = (int) (currentCount + dataBuffer.readableByteCount()); + return true; + }) + .map(buffer -> { + long count = byteCountDown.get(); + if (count < 0) { + int skipCount = buffer.readableByteCount() + (int) count; byteCountDown.set(0); - return dataBuffer.slice(skip, dataBuffer.readableByteCount() - skip); + return buffer.slice(skipCount, buffer.readableByteCount() - skipCount); } - return dataBuffer; + return buffer; }); } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 21e52d8858..e6b1ff5538 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -225,7 +225,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } @Test - public void takeUntilByteCount() throws Exception { + public void takeUntilByteCount() { DataBuffer foo = stringBuffer("foo"); DataBuffer bar = stringBuffer("bar"); DataBuffer baz = stringBuffer("baz"); @@ -242,7 +242,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } @Test - public void skipUntilByteCount() throws Exception { + public void skipUntilByteCount() { DataBuffer foo = stringBuffer("foo"); DataBuffer bar = stringBuffer("bar"); DataBuffer baz = stringBuffer("baz"); @@ -257,7 +257,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } @Test - public void skipUntilByteCountShouldSkipAll() throws Exception { + public void skipUntilByteCountShouldSkipAll() { DataBuffer foo = stringBuffer("foo"); DataBuffer bar = stringBuffer("bar"); DataBuffer baz = stringBuffer("baz");