From 55946bf319d1cbb4bc899689f1ef3d6649aca479 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 29 Jul 2019 21:13:39 +0100 Subject: [PATCH] Factor out MetadataEncoder from RSocketRequester To be re-used also for creating metadata for the setup payload. See: gh-23368 --- .../rsocket/DefaultMetadataExtractor.java | 5 +- .../rsocket/DefaultRSocketRequester.java | 116 ++------- .../messaging/rsocket/MetadataEncoder.java | 232 ++++++++++++++++++ .../DefaultMetadataExtractorTests.java | 60 ++--- .../rsocket/DefaultRSocketRequesterTests.java | 91 +------ .../rsocket/MetadataEncoderTests.java | 201 +++++++++++++++ 6 files changed, 475 insertions(+), 230 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/rsocket/MetadataEncoderTests.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java index 90b76a3787..49462c3407 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.function.BiConsumer; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; import io.rsocket.metadata.CompositeMetadata; @@ -179,8 +179,9 @@ public class DefaultMetadataExtractor implements MetadataExtractor { private static class EntryExtractor { + // We only need this to wrap ByteBufs private final static NettyDataBufferFactory bufferFactory = - new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); + new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); private final Decoder decoder; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index 8ce78b4cf3..d611f601bb 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -17,18 +17,10 @@ package org.springframework.messaging.rsocket; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.metadata.CompositeMetadataFlyweight; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -41,12 +33,9 @@ import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.NettyDataBuffer; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.MimeType; -import org.springframework.util.ObjectUtils; /** * Default implementation of {@link RSocketRequester}. @@ -56,9 +45,6 @@ import org.springframework.util.ObjectUtils; */ final class DefaultRSocketRequester implements RSocketRequester { - /** For route variable replacement. */ - private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}"); - private static final Map EMPTY_HINTS = Collections.emptyMap(); @@ -107,30 +93,7 @@ final class DefaultRSocketRequester implements RSocketRequester { @Override public RequestSpec route(String route, Object... vars) { - Assert.notNull(route, "'route' is required"); - route = expand(route, vars); - return new DefaultRequestSpec(route, isCompositeMetadata() ? MetadataExtractor.ROUTING : null); - } - - private static String expand(String route, Object... vars) { - if (ObjectUtils.isEmpty(vars)) { - return route; - } - StringBuffer sb = new StringBuffer(); - int index = 0; - Matcher matcher = VARS_PATTERN.matcher(route); - while (matcher.find()) { - Assert.isTrue(index < vars.length, () -> "No value for variable '" + matcher.group(1) + "'"); - String value = vars[index].toString(); - value = value.contains(".") ? value.replaceAll("\\.", "%2E") : value; - matcher.appendReplacement(sb, value); - index++; - } - return sb.toString(); - } - - private boolean isCompositeMetadata() { - return metadataMimeType().equals(MetadataExtractor.COMPOSITE_METADATA); + return new DefaultRequestSpec(route, vars); } @Override @@ -150,22 +113,23 @@ final class DefaultRSocketRequester implements RSocketRequester { private class DefaultRequestSpec implements RequestSpec { - private final Map metadata = new LinkedHashMap<>(4); + private final MetadataEncoder metadataEncoder; - DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) { - mimeType = (mimeType == null && !isCompositeMetadata() ? metadataMimeType() : mimeType); - Assert.notNull(mimeType, "MimeType is required for composite metadata"); - metadata(metadata, mimeType); + public DefaultRequestSpec(String route, Object... vars) { + this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies); + this.metadataEncoder.route(route, vars); } + public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) { + this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies); + this.metadataEncoder.metadata(metadata, mimeType); + } + + @Override public RequestSpec metadata(Object metadata, MimeType mimeType) { - Assert.notNull(metadata, "Metadata content is required"); - Assert.notNull(mimeType, "MimeType is required"); - Assert.isTrue(this.metadata.isEmpty() || isCompositeMetadata(), - "Composite metadata required for multiple metadata entries."); - this.metadata.put(metadata, mimeType); + this.metadataEncoder.metadata(metadata, mimeType); return this; } @@ -265,70 +229,18 @@ final class DefaultRSocketRequester implements RSocketRequester { private Payload firstPayload(DataBuffer data) { DataBuffer metadata; try { - metadata = getMetadata(); - return PayloadUtils.createPayload(metadata, data); + metadata = this.metadataEncoder.encode(); } catch (Throwable ex) { DataBufferUtils.release(data); throw ex; } + return PayloadUtils.createPayload(metadata, data); } private Mono emptyPayload() { return Mono.fromCallable(() -> firstPayload(emptyDataBuffer)); } - - private DataBuffer getMetadata() { - if (isCompositeMetadata()) { - CompositeByteBuf metadata = getAllocator().compositeBuffer(); - this.metadata.forEach((value, mimeType) -> { - DataBuffer dataBuffer = encodeMetadata(value, mimeType); - CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), mimeType.toString(), - dataBuffer instanceof NettyDataBuffer ? - ((NettyDataBuffer) dataBuffer).getNativeBuffer() : - Unpooled.wrappedBuffer(dataBuffer.asByteBuffer())); - }); - return asDataBuffer(metadata); - } - else { - Assert.isTrue(this.metadata.size() == 1, "Composite metadata required for multiple entries"); - Map.Entry entry = this.metadata.entrySet().iterator().next(); - if (!metadataMimeType().equals(entry.getValue())) { - throw new IllegalArgumentException( - "Connection configured for metadata mime type " + - "'" + metadataMimeType() + "', but actual is `" + this.metadata + "`"); - } - return encodeMetadata(entry.getKey(), entry.getValue()); - } - } - - @SuppressWarnings("unchecked") - private DataBuffer encodeMetadata(Object metadata, MimeType mimeType) { - if (metadata instanceof DataBuffer) { - return (DataBuffer) metadata; - } - ResolvableType type = ResolvableType.forInstance(metadata); - Encoder encoder = strategies.encoder(type, mimeType); - Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'"); - return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, EMPTY_HINTS); - } - - private ByteBufAllocator getAllocator() { - return bufferFactory() instanceof NettyDataBufferFactory ? - ((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() : - ByteBufAllocator.DEFAULT; - } - - private DataBuffer asDataBuffer(ByteBuf byteBuf) { - if (bufferFactory() instanceof NettyDataBufferFactory) { - return ((NettyDataBufferFactory) bufferFactory()).wrap(byteBuf); - } - else { - DataBuffer dataBuffer = bufferFactory().wrap(byteBuf.nioBuffer()); - byteBuf.release(); - return dataBuffer; - } - } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java new file mode 100644 index 0000000000..2d8b3b2876 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataEncoder.java @@ -0,0 +1,232 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.rsocket; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.metadata.CompositeMetadataFlyweight; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.Encoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.MimeType; +import org.springframework.util.ObjectUtils; + +/** + * Helps to collect metadata values and mime types, and encode them. + * + * @author Rossen Stoyanchev + * @since 5.2 + */ +final class MetadataEncoder { + + /** For route variable replacement. */ + private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}"); + + + private final MimeType metadataMimeType; + + private final RSocketStrategies strategies; + + private final boolean isComposite; + + private final ByteBufAllocator allocator; + + @Nullable + private String route; + + private final Map metadata = new LinkedHashMap<>(4); + + + MetadataEncoder(MimeType metadataMimeType, RSocketStrategies strategies) { + Assert.notNull(metadataMimeType, "'metadataMimeType' is required"); + Assert.notNull(strategies, "RSocketStrategies is required"); + this.metadataMimeType = metadataMimeType; + this.strategies = strategies; + this.isComposite = metadataMimeType.equals(MetadataExtractor.COMPOSITE_METADATA); + this.allocator = bufferFactory() instanceof NettyDataBufferFactory ? + ((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() : ByteBufAllocator.DEFAULT; + } + + + private DataBufferFactory bufferFactory() { + return this.strategies.dataBufferFactory(); + } + + + /** + * Set the route to a remote handler as described in + * {@link RSocketRequester#route(String, Object...)}. + */ + public MetadataEncoder route(String route, Object... routeVars) { + this.route = expand(route, routeVars); + assertMetadataEntryCount(); + return this; + } + + private static String expand(String route, Object... routeVars) { + if (ObjectUtils.isEmpty(routeVars)) { + return route; + } + StringBuffer sb = new StringBuffer(); + int index = 0; + Matcher matcher = VARS_PATTERN.matcher(route); + while (matcher.find()) { + Assert.isTrue(index < routeVars.length, () -> "No value for variable '" + matcher.group(1) + "'"); + String value = routeVars[index].toString(); + value = value.contains(".") ? value.replaceAll("\\.", "%2E") : value; + matcher.appendReplacement(sb, value); + index++; + } + return sb.toString(); + } + + private void assertMetadataEntryCount() { + if (!this.isComposite) { + int count = this.route != null ? this.metadata.size() + 1 : this.metadata.size(); + Assert.isTrue(count < 2, "Composite metadata required for multiple metadata entries."); + } + } + + /** + * Add a metadata entry. If called more than once or in addition to route, + * composite metadata must be in use. + */ + public MetadataEncoder metadata(Object metadata, @Nullable MimeType mimeType) { + if (this.isComposite) { + Assert.notNull(mimeType, "MimeType is required for composite metadata entries."); + } + else if (mimeType == null) { + mimeType = this.metadataMimeType; + } + else if (!this.metadataMimeType.equals(mimeType)) { + throw new IllegalArgumentException("Mime type is optional (may be null) " + + "but was provided and does not match the connection metadata mime type."); + } + this.metadata.put(metadata, mimeType); + assertMetadataEntryCount(); + return this; + } + + /** + * Add route and/or metadata, both optional. + */ + public MetadataEncoder metadataAndOrRoute(@Nullable Map metadata, + @Nullable String route, @Nullable Object[] vars) { + + if (route != null) { + this.route = expand(route, vars != null ? vars : new Object[0]); + } + if (!CollectionUtils.isEmpty(metadata)) { + for (Map.Entry entry : metadata.entrySet()) { + metadata(entry.getKey(), entry.getValue()); + } + } + assertMetadataEntryCount(); + return this; + } + + + /** + * Encode the collected metadata entries to a {@code DataBuffer}. + * @see PayloadUtils#createPayload(DataBuffer, DataBuffer) + */ + public DataBuffer encode() { + Map mergedMetadata = mergeRouteAndMetadata(); + if (this.isComposite) { + CompositeByteBuf composite = this.allocator.compositeBuffer(); + try { + mergedMetadata.forEach((value, mimeType) -> { + DataBuffer buffer = encodeEntry(value, mimeType); + CompositeMetadataFlyweight.encodeAndAddMetadata(composite, this.allocator, + mimeType.toString(), + buffer instanceof NettyDataBuffer ? + ((NettyDataBuffer) buffer).getNativeBuffer() : + Unpooled.wrappedBuffer(buffer.asByteBuffer())); + }); + if (bufferFactory() instanceof NettyDataBufferFactory) { + return ((NettyDataBufferFactory) bufferFactory()).wrap(composite); + } + else { + DataBuffer buffer = bufferFactory().allocateBuffer(); + buffer.write(composite.nioBuffer()); + composite.release(); + return buffer; + } + } + catch (Throwable ex) { + composite.release(); + throw ex; + } + } + else { + Assert.isTrue(mergedMetadata.size() == 1, "Composite metadata required for multiple entries"); + Map.Entry entry = mergedMetadata.entrySet().iterator().next(); + if (!this.metadataMimeType.equals(entry.getValue())) { + throw new IllegalArgumentException( + "Connection configured for metadata mime type " + + "'" + this.metadataMimeType + "', but actual is `" + mergedMetadata + "`"); + } + return encodeEntry(entry.getKey(), entry.getValue()); + } + } + + private Map mergeRouteAndMetadata() { + if (this.route == null) { + return this.metadata; + } + + MimeType routeMimeType = this.metadataMimeType.equals(MetadataExtractor.COMPOSITE_METADATA) ? + MetadataExtractor.ROUTING : this.metadataMimeType; + + Object routeValue = this.route; + if (routeMimeType.equals(MetadataExtractor.ROUTING)) { + // TODO: use rsocket-core API when available + routeValue = bufferFactory().wrap(this.route.getBytes(StandardCharsets.UTF_8)); + } + + Map result = new LinkedHashMap<>(); + result.put(routeValue, routeMimeType); + result.putAll(this.metadata); + return result; + } + + @SuppressWarnings("unchecked") + private DataBuffer encodeEntry(Object metadata, MimeType mimeType) { + if (metadata instanceof DataBuffer) { + return (DataBuffer) metadata; + } + ResolvableType type = ResolvableType.forInstance(metadata); + Encoder encoder = this.strategies.encoder(type, mimeType); + Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'"); + return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, Collections.emptyMap()); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultMetadataExtractorTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultMetadataExtractorTests.java index 438087fa97..1e64d3e18a 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultMetadataExtractorTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultMetadataExtractorTests.java @@ -21,19 +21,15 @@ import java.util.Map; import io.netty.buffer.PooledByteBufAllocator; import io.rsocket.Payload; -import io.rsocket.RSocket; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.BDDMockito; -import reactor.core.publisher.Mono; import org.springframework.core.codec.ByteArrayDecoder; import org.springframework.core.codec.StringDecoder; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.Assert; -import org.springframework.util.MimeType; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -53,23 +49,13 @@ public class DefaultMetadataExtractorTests { private RSocketStrategies strategies; - private ArgumentCaptor captor; - - private RSocket rsocket; - private DefaultMetadataExtractor extractor; @Before public void setUp() { - this.strategies = RSocketStrategies.builder() - .dataBufferFactory(new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) - .build(); - - this.rsocket = BDDMockito.mock(RSocket.class); - this.captor = ArgumentCaptor.forClass(Payload.class); - BDDMockito.when(this.rsocket.fireAndForget(captor.capture())).thenReturn(Mono.empty()); - + DataBufferFactory bufferFactory = new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); + this.strategies = RSocketStrategies.builder().dataBufferFactory(bufferFactory).build(); this.extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes()); } @@ -82,14 +68,14 @@ public class DefaultMetadataExtractorTests { @Test public void compositeMetadataWithDefaultSettings() { - requester(COMPOSITE_METADATA).route("toA") + MetadataEncoder metadataEncoder = new MetadataEncoder(COMPOSITE_METADATA, this.strategies) + .route("toA") .metadata("text data", TEXT_PLAIN) .metadata("html data", TEXT_HTML) - .metadata("xml data", TEXT_XML) - .data("data") - .send().block(); + .metadata("xml data", TEXT_XML); - Payload payload = this.captor.getValue(); + DataBuffer metadata = metadataEncoder.encode(); + Payload payload = createPayload(metadata); Map result = this.extractor.extract(payload, COMPOSITE_METADATA); payload.release(); @@ -102,15 +88,14 @@ public class DefaultMetadataExtractorTests { this.extractor.metadataToExtract(TEXT_HTML, String.class, "html-entry"); this.extractor.metadataToExtract(TEXT_XML, String.class, "xml-entry"); - requester(COMPOSITE_METADATA).route("toA") + MetadataEncoder metadataEncoder = new MetadataEncoder(COMPOSITE_METADATA, this.strategies) + .route("toA") .metadata("text data", TEXT_PLAIN) .metadata("html data", TEXT_HTML) - .metadata("xml data", TEXT_XML) - .data("data") - .send() - .block(); + .metadata("xml data", TEXT_XML); - Payload payload = this.captor.getValue(); + DataBuffer metadata = metadataEncoder.encode(); + Payload payload = createPayload(metadata); Map result = this.extractor.extract(payload, COMPOSITE_METADATA); payload.release(); @@ -123,8 +108,9 @@ public class DefaultMetadataExtractorTests { @Test public void route() { - requester(ROUTING).route("toA").data("data").send().block(); - Payload payload = this.captor.getValue(); + MetadataEncoder metadataEncoder = new MetadataEncoder(ROUTING, this.strategies).route("toA"); + DataBuffer metadata = metadataEncoder.encode(); + Payload payload = createPayload(metadata); Map result = this.extractor.extract(payload, ROUTING); payload.release(); @@ -135,8 +121,9 @@ public class DefaultMetadataExtractorTests { public void routeAsText() { this.extractor.metadataToExtract(TEXT_PLAIN, String.class, ROUTE_KEY); - requester(TEXT_PLAIN).route("toA").data("data").send().block(); - Payload payload = this.captor.getValue(); + MetadataEncoder metadataEncoder = new MetadataEncoder(TEXT_PLAIN, this.strategies).route("toA"); + DataBuffer metadata = metadataEncoder.encode(); + Payload payload = createPayload(metadata); Map result = this.extractor.extract(payload, TEXT_PLAIN); payload.release(); @@ -152,8 +139,9 @@ public class DefaultMetadataExtractorTests { result.put("entry1", items[1]); }); - requester(TEXT_PLAIN).metadata("toA:text data", null).data("data").send().block(); - Payload payload = this.captor.getValue(); + MetadataEncoder encoder = new MetadataEncoder(TEXT_PLAIN, this.strategies).metadata("toA:text data", null); + DataBuffer metadata = encoder.encode(); + Payload payload = createPayload(metadata); Map result = this.extractor.extract(payload, TEXT_PLAIN); payload.release(); @@ -174,8 +162,8 @@ public class DefaultMetadataExtractorTests { } - private RSocketRequester requester(MimeType metadataMimeType) { - return RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, metadataMimeType, this.strategies); + private Payload createPayload(DataBuffer metadata) { + return PayloadUtils.createPayload(metadata, this.strategies.dataBufferFactory().allocateBuffer()); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java index 2bb92f1612..a9f38f0054 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java @@ -19,7 +19,6 @@ package org.springframework.messaging.rsocket; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -29,7 +28,6 @@ import io.reactivex.Observable; import io.reactivex.Single; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; -import io.rsocket.metadata.CompositeMetadata; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; @@ -37,8 +35,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec; @@ -47,7 +43,6 @@ import org.springframework.messaging.rsocket.RSocketRequester.ResponseSpec; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.springframework.util.MimeTypeUtils.TEXT_PLAIN; /** @@ -64,17 +59,13 @@ public class DefaultRSocketRequesterTests { private RSocketRequester requester; - private RSocketStrategies strategies; + private final RSocketStrategies strategies = RSocketStrategies.create(); private final DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @Before public void setUp() { - this.strategies = RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) - .build(); this.rsocket = new TestRSocket(); this.requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies); } @@ -141,86 +132,6 @@ public class DefaultRSocketRequesterTests { } } - @Test - public void metadataCompositeWithRoute() { - - RSocketRequester requester = RSocketRequester.wrap( - this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies); - - requester.route("toA").data("bodyA").send().block(Duration.ofSeconds(5)); - - CompositeMetadata entries = new CompositeMetadata(this.rsocket.getSavedPayload().metadata(), false); - Iterator iterator = entries.iterator(); - - assertThat(iterator.hasNext()).isTrue(); - CompositeMetadata.Entry entry = iterator.next(); - assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); - assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); - - assertThat(iterator.hasNext()).isFalse(); - } - - @Test - public void metadataCompositeWithRouteAndTextEntry() { - - RSocketRequester requester = RSocketRequester.wrap( - this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies); - - requester.route("toA") - .metadata("My metadata", TEXT_PLAIN).data("bodyA") - .send() - .block(Duration.ofSeconds(5)); - - CompositeMetadata entries = new CompositeMetadata(this.rsocket.getSavedPayload().metadata(), false); - Iterator iterator = entries.iterator(); - - assertThat(iterator.hasNext()).isTrue(); - CompositeMetadata.Entry entry = iterator.next(); - assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); - assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); - - assertThat(iterator.hasNext()).isTrue(); - entry = iterator.next(); - assertThat(entry.getMimeType()).isEqualTo(TEXT_PLAIN.toString()); - assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("My metadata"); - - assertThat(iterator.hasNext()).isFalse(); - } - - @Test - public void metadataRouteAsText() { - RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies); - requester.route("toA").data("bodyA").send().block(Duration.ofSeconds(5)); - assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA"); - } - - @Test - public void metadataAsText() { - RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies); - requester.metadata("toA", null).data("bodyA").send().block(Duration.ofSeconds(5)); - assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA"); - } - - @Test - public void metadataMimeTypeMismatch() { - RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies); - assertThatThrownBy(() -> requester.metadata("toA", MetadataExtractor.ROUTING).data("bodyA").send().block()) - .hasMessageStartingWith("Connection configured for metadata mime type"); - } - - @Test - public void routeWithVars() { - RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies); - requester.route("a.{b}.{c}", "BBB", "C.C.C").data("body").send().block(); - assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("a.BBB.C%2EC%2EC"); - } - - @Test - public void supportedMetadataMimeTypes() { - RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies); - RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.ROUTING, this.strategies); - } - @Test public void retrieveMono() { String value = "bodyA"; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/MetadataEncoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/MetadataEncoderTests.java new file mode 100644 index 0000000000..cf1635a3d6 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/MetadataEncoderTests.java @@ -0,0 +1,201 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.rsocket; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.metadata.CompositeMetadata; +import org.junit.Test; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.core.io.buffer.support.DataBufferTestUtils; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * + * @author Rossen Stoyanchev + * @since 5.2 + */ +public class MetadataEncoderTests { + + private final RSocketStrategies strategies = RSocketStrategies.create(); + + + @Test + public void compositeMetadataWithRoute() { + DataBuffer buffer = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, this.strategies) + .route("toA") + .encode(); + + CompositeMetadata entries = new CompositeMetadata(((NettyDataBuffer) buffer).getNativeBuffer(), false); + Iterator iterator = entries.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + CompositeMetadata.Entry entry = iterator.next(); + assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); + assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); + + assertThat(iterator.hasNext()).isFalse(); + } + + @Test + public void compositeMetadataWithRouteAndText() { + + DataBuffer buffer = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, this.strategies) + .route("toA") + .metadata("My metadata", MimeTypeUtils.TEXT_PLAIN) + .encode(); + + CompositeMetadata entries = new CompositeMetadata(((NettyDataBuffer) buffer).getNativeBuffer(), false); + Iterator iterator = entries.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + CompositeMetadata.Entry entry = iterator.next(); + assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); + assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); + + assertThat(iterator.hasNext()).isTrue(); + entry = iterator.next(); + assertThat(entry.getMimeType()).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString()); + assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("My metadata"); + + assertThat(iterator.hasNext()).isFalse(); + } + + @Test + public void routeWithRoutingMimeType() { + DataBuffer buffer = + new MetadataEncoder(MetadataExtractor.ROUTING, this.strategies) + .route("toA") + .encode(); + + assertThat(dumpString(buffer)).isEqualTo("toA"); + } + + @Test + public void routeWithTextPlainMimeType() { + DataBuffer buffer = + new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies) + .route("toA") + .encode(); + + assertThat(dumpString(buffer)).isEqualTo("toA"); + } + + @Test + public void routeWithVars() { + DataBuffer buffer = + new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies) + .route("a.{b}.{c}", "BBB", "C.C.C") + .encode(); + + assertThat(dumpString(buffer)).isEqualTo("a.BBB.C%2EC%2EC"); + } + + @Test + public void metadataWithTextPlainMimeType() { + DataBuffer buffer = + new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies) + .metadata("toA", null) + .encode(); + + assertThat(dumpString(buffer)).isEqualTo("toA"); + } + + @Test + public void compositeRequiredForMultipleEntries() { + + // Route, metadata + MetadataEncoder encoder1 = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies); + encoder1.route("toA"); + + assertThatThrownBy(() -> encoder1.metadata("My metadata", MimeTypeUtils.TEXT_PLAIN)) + .hasMessage("Composite metadata required for multiple metadata entries."); + + // Metadata, route + MetadataEncoder encoder2 = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies); + encoder2.metadata("My metadata", MimeTypeUtils.TEXT_PLAIN); + + assertThatThrownBy(() -> encoder2.route("toA")) + .hasMessage("Composite metadata required for multiple metadata entries."); + + // Route and metadata + MetadataEncoder encoder3 = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies); + Map metadata = Collections.singletonMap("My metadata", MimeTypeUtils.TEXT_PLAIN); + + assertThatThrownBy(() -> encoder3.metadataAndOrRoute(metadata, "toA", new Object[0])) + .hasMessage("Composite metadata required for multiple metadata entries."); + } + + @Test + public void mimeTypeRequiredForCompositeEntries() { + MetadataEncoder encoder = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, this.strategies); + + assertThatThrownBy(() -> encoder.metadata("toA", null)) + .hasMessage("MimeType is required for composite metadata entries."); + } + + @Test + public void mimeTypeDoesNotMatchConnectionMetadataMimeType() { + MetadataEncoder encoder = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies); + + assertThatThrownBy(() -> encoder.metadata("toA", MimeTypeUtils.APPLICATION_JSON)) + .hasMessage("Mime type is optional (may be null) " + + "but was provided and does not match the connection metadata mime type."); + } + + @Test + public void defaultDataBufferFactory() { + DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + RSocketStrategies strategies = RSocketStrategies.builder().dataBufferFactory(bufferFactory).build(); + + DataBuffer buffer = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, strategies) + .route("toA") + .encode(); + + ByteBuf byteBuf = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT) + .wrap(buffer.asByteBuffer()) + .getNativeBuffer(); + + CompositeMetadata entries = new CompositeMetadata(byteBuf, false); + Iterator iterator = entries.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + CompositeMetadata.Entry entry = iterator.next(); + assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); + assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); + + assertThat(iterator.hasNext()).isFalse(); + } + + + private String dumpString(DataBuffer buffer) { + return DataBufferTestUtils.dumpString(buffer, StandardCharsets.UTF_8); + } + +}