From a1a878127980b5744f08d74d85732256da50b22d Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 30 Jul 2019 16:12:10 +0100 Subject: [PATCH] PayloadUtils improvements and tests --- .../messaging/rsocket/MetadataEncoder.java | 12 +- .../messaging/rsocket/PayloadUtils.java | 54 +++--- .../messaging/rsocket/PayloadUtilsTests.java | 175 ++++++++++++++++++ 3 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/rsocket/PayloadUtilsTests.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java index 2d8b3b2876..745e66d472 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java @@ -24,14 +24,12 @@ import java.util.regex.Pattern; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; import io.rsocket.metadata.CompositeMetadataFlyweight; import org.springframework.core.ResolvableType; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -165,18 +163,14 @@ final class MetadataEncoder { try { mergedMetadata.forEach((value, mimeType) -> { DataBuffer buffer = encodeEntry(value, mimeType); - CompositeMetadataFlyweight.encodeAndAddMetadata(composite, this.allocator, - mimeType.toString(), - buffer instanceof NettyDataBuffer ? - ((NettyDataBuffer) buffer).getNativeBuffer() : - Unpooled.wrappedBuffer(buffer.asByteBuffer())); + CompositeMetadataFlyweight.encodeAndAddMetadata( + composite, this.allocator, mimeType.toString(), PayloadUtils.asByteBuf(buffer)); }); if (bufferFactory() instanceof NettyDataBufferFactory) { return ((NettyDataBufferFactory) bufferFactory()).wrap(composite); } else { - DataBuffer buffer = bufferFactory().allocateBuffer(); - buffer.write(composite.nioBuffer()); + DataBuffer buffer = bufferFactory().wrap(composite.nioBuffer()); composite.release(); return buffer; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java index 7c24aa0ce4..43135504d8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java @@ -16,7 +16,10 @@ package org.springframework.messaging.rsocket; +import java.nio.ByteBuffer; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.rsocket.Payload; import io.rsocket.util.ByteBufPayload; import io.rsocket.util.DefaultPayload; @@ -64,41 +67,42 @@ public abstract class PayloadUtils { /** * Create a Payload from the given metadata and data. + *

If at least one is {@link NettyDataBuffer} then {@link ByteBufPayload} + * is created with either obtaining the underlying native {@link ByteBuf} + * or using {@link Unpooled#wrappedBuffer(ByteBuffer...)} if necessary. + * Otherwise, if both are {@link DefaultDataBuffer}, then + * {@link DefaultPayload} is created. * @param metadata the metadata part for the payload * @param data the data part for the payload - * @return the created Payload + * @return the created payload */ public static Payload createPayload(DataBuffer metadata, DataBuffer data) { - if (metadata instanceof NettyDataBuffer && data instanceof NettyDataBuffer) { - return ByteBufPayload.create( - ((NettyDataBuffer) data).getNativeBuffer(), - ((NettyDataBuffer) metadata).getNativeBuffer()); - } - else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuffer) { - return DefaultPayload.create( - ((DefaultDataBuffer) data).getNativeBuffer(), - ((DefaultDataBuffer) metadata).getNativeBuffer()); - } - else { - return DefaultPayload.create(data.asByteBuffer(), metadata.asByteBuffer()); - } + return data instanceof NettyDataBuffer || metadata instanceof NettyDataBuffer ? + ByteBufPayload.create(asByteBuf(data), asByteBuf(metadata)) : + DefaultPayload.create(asByteBuffer(data), asByteBuffer(metadata)); } /** - * Create a Payload from the given data. + * Create a Payload with data only. The created payload is + * {@link ByteBufPayload} if the input is {@link NettyDataBuffer} or + * otherwise it is {@link DefaultPayload}. * @param data the data part for the payload - * @return the created Payload + * @return created payload */ public static Payload createPayload(DataBuffer data) { - if (data instanceof NettyDataBuffer) { - return ByteBufPayload.create(((NettyDataBuffer) data).getNativeBuffer()); - } - else if (data instanceof DefaultDataBuffer) { - return DefaultPayload.create(((DefaultDataBuffer) data).getNativeBuffer()); - } - else { - return DefaultPayload.create(data.asByteBuffer()); - } + return data instanceof NettyDataBuffer ? + ByteBufPayload.create(asByteBuf(data)) : DefaultPayload.create(asByteBuffer(data)); + } + + + static ByteBuf asByteBuf(DataBuffer buffer) { + return buffer instanceof NettyDataBuffer ? + ((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer()); + } + + private static ByteBuffer asByteBuffer(DataBuffer buffer) { + return buffer instanceof DefaultDataBuffer ? + ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.asByteBuffer(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/PayloadUtilsTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/PayloadUtilsTests.java new file mode 100644 index 0000000000..9e11a5485a --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/PayloadUtilsTests.java @@ -0,0 +1,175 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.rsocket; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; + +import io.netty.buffer.PooledByteBufAllocator; +import io.rsocket.Payload; +import io.rsocket.util.ByteBufPayload; +import io.rsocket.util.DefaultPayload; +import org.junit.After; +import org.junit.Test; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.support.DataBufferTestUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link PayloadUtils}. + * @author Rossen Stoyanchev + * @since 5.2 + */ +public class PayloadUtilsTests { + + private LeakAwareNettyDataBufferFactory nettyBufferFactory = + new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); + + private DefaultDataBufferFactory defaultBufferFactory = new DefaultDataBufferFactory(); + + + @After + public void tearDown() throws Exception { + this.nettyBufferFactory.checkForLeaks(Duration.ofSeconds(5)); + } + + + @Test + public void retainAndReleaseWithNettyFactory() { + Payload payload = ByteBufPayload.create("sample data"); + DataBuffer buffer = PayloadUtils.retainDataAndReleasePayload(payload, this.nettyBufferFactory); + try { + assertThat(buffer).isInstanceOf(NettyDataBuffer.class); + assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isEqualTo(1); + assertThat(payload.refCnt()).isEqualTo(0); + } + finally { + DataBufferUtils.release(buffer); + } + } + + @Test + public void retainAndReleaseWithDefaultFactory() { + Payload payload = ByteBufPayload.create("sample data"); + DataBuffer buffer = PayloadUtils.retainDataAndReleasePayload(payload, this.defaultBufferFactory); + + assertThat(buffer).isInstanceOf(DefaultDataBuffer.class); + assertThat(payload.refCnt()).isEqualTo(0); + } + + @Test + public void createWithNettyBuffers() { + NettyDataBuffer data = createNettyDataBuffer("sample data"); + NettyDataBuffer metadata = createNettyDataBuffer("sample metadata"); + + Payload payload = PayloadUtils.createPayload(metadata, data); + try { + assertThat(payload).isInstanceOf(ByteBufPayload.class); + assertThat(payload.data()).isSameAs(data.getNativeBuffer()); + assertThat(payload.metadata()).isSameAs(metadata.getNativeBuffer()); + } + finally { + payload.release(); + } + } + + @Test + public void createWithDefaultBuffers() { + DataBuffer data = createDefaultDataBuffer("sample data"); + DataBuffer metadata = createDefaultDataBuffer("sample metadata"); + Payload payload = PayloadUtils.createPayload(metadata, data); + + assertThat(payload).isInstanceOf(DefaultPayload.class); + assertThat(payload.getDataUtf8()).isEqualTo(dataBufferToString(data)); + assertThat(payload.getMetadataUtf8()).isEqualTo(dataBufferToString(metadata)); + } + + @Test + public void createWithNettyAndDefaultBuffers() { + NettyDataBuffer data = createNettyDataBuffer("sample data"); + DefaultDataBuffer metadata = createDefaultDataBuffer("sample metadata"); + Payload payload = PayloadUtils.createPayload(metadata, data); + try { + assertThat(payload).isInstanceOf(ByteBufPayload.class); + assertThat(payload.data()).isSameAs(data.getNativeBuffer()); + assertThat(payload.getMetadataUtf8()).isEqualTo(dataBufferToString(metadata)); + } + finally { + payload.release(); + } + } + + @Test + public void createWithDefaultAndNettyBuffers() { + DefaultDataBuffer data = createDefaultDataBuffer("sample data"); + NettyDataBuffer metadata = createNettyDataBuffer("sample metadata"); + Payload payload = PayloadUtils.createPayload(metadata, data); + try { + assertThat(payload).isInstanceOf(ByteBufPayload.class); + assertThat(payload.getDataUtf8()).isEqualTo(dataBufferToString(data)); + assertThat(payload.metadata()).isSameAs(metadata.getNativeBuffer()); + } + finally { + payload.release(); + } + } + + @Test + public void createWithNettyBuffer() { + NettyDataBuffer data = createNettyDataBuffer("sample data"); + Payload payload = PayloadUtils.createPayload(data); + try { + assertThat(payload).isInstanceOf(ByteBufPayload.class); + assertThat(payload.data()).isSameAs(data.getNativeBuffer()); + } + finally { + payload.release(); + } + } + + @Test + public void createWithDefaultBuffer() { + DataBuffer data = createDefaultDataBuffer("sample data"); + Payload payload = PayloadUtils.createPayload(data); + + assertThat(payload).isInstanceOf(DefaultPayload.class); + assertThat(payload.getDataUtf8()).isEqualTo(dataBufferToString(data)); + } + + + private NettyDataBuffer createNettyDataBuffer(String content) { + NettyDataBuffer buffer = this.nettyBufferFactory.allocateBuffer(); + buffer.write(content, StandardCharsets.UTF_8); + return buffer; + } + + private DefaultDataBuffer createDefaultDataBuffer(String content) { + DefaultDataBuffer buffer = this.defaultBufferFactory.allocateBuffer(); + buffer.write(content, StandardCharsets.UTF_8); + return buffer; + } + + private String dataBufferToString(DataBuffer metadata) { + return DataBufferTestUtils.dumpString(metadata, StandardCharsets.UTF_8); + } + +}