From a780cad12eb3d109b55b7040b98f0e027b47065f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 23 Jul 2019 16:42:06 +0100 Subject: [PATCH] Updates to RSocket[Strategies|Requester] defaults 1. RSocketStrategies hooks in the basic codecs from spring-core by default. Now that we have support for composite metadata, it makes sense to have multiple codecs available. 2. RSocketStrategies is pre-configured with NettyDataBufferFactory. 3. DefaultRSocketRequesterBuilder configures RSocket with a frame decoder that matches the DataBufferFactory choice, i.e. ensuring consistency of zero copy vs default (copy) choice. 4. DefaultRSocketRequesterBuilder now tries to find a single non-basic decoder to select a default data MimeType (e.g. CBOR), or otherwise fall back on the first default decoder (e.g. String). See gh-23314 --- .../DefaultRSocketRequesterBuilder.java | 49 +++-- .../rsocket/DefaultRSocketStrategies.java | 46 ++++- .../messaging/rsocket/RSocketRequester.java | 33 ++- .../messaging/rsocket/RSocketStrategies.java | 53 ++--- .../DefaultRSocketRequesterBuilderTests.java | 190 +++++++++++++++--- 5 files changed, 286 insertions(+), 85 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 5a936283b2..247f66abee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; -import java.util.stream.Stream; import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; @@ -29,6 +29,9 @@ import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; +import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.StringDecoder; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.MimeType; @@ -110,7 +113,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { MimeType dataMimeType = getDataMimeType(rsocketStrategies); rsocketFactory.dataMimeType(dataMimeType.toString()); rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); - rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); + + if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { + rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); + } this.rsocketFactoryConfigurers.forEach(configurer -> { configurer.configureWithStrategies(rsocketStrategies); @@ -139,16 +145,35 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { if (this.dataMimeType != null) { return this.dataMimeType; } - return Stream - .concat( - strategies.encoders().stream() - .flatMap(encoder -> encoder.getEncodableMimeTypes().stream()), - strategies.decoders().stream() - .flatMap(encoder -> encoder.getDecodableMimeTypes().stream()) - ) - .filter(MimeType::isConcrete) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use.")); + // Look for non-basic Decoder (e.g. CBOR, Protobuf) + MimeType selected = null; + List> decoders = strategies.decoders(); + for (Decoder candidate : decoders) { + if (!isCoreCodec(candidate) && !candidate.getDecodableMimeTypes().isEmpty()) { + Assert.state(selected == null, + () -> "Cannot select default data MimeType based on configured decoders: " + decoders); + selected = getMimeType(candidate); + } + } + if (selected != null) { + return selected; + } + // Fall back on 1st decoder (e.g. String) + for (Decoder decoder : decoders) { + if (!decoder.getDecodableMimeTypes().isEmpty()) { + return getMimeType(decoder); + } + } + throw new IllegalArgumentException("Failed to select data MimeType to use."); + } + + private static boolean isCoreCodec(Object codec) { + return codec.getClass().getPackage().equals(StringDecoder.class.getPackage()); + } + + private static MimeType getMimeType(Decoder decoder) { + MimeType mimeType = decoder.getDecodableMimeTypes().get(0); + return new MimeType(mimeType, Collections.emptyMap()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java index ba72af8101..c16958d7e8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java @@ -22,11 +22,21 @@ import java.util.Collections; import java.util.List; import java.util.function.Consumer; +import io.netty.buffer.PooledByteBufAllocator; + import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.core.codec.ByteArrayDecoder; +import org.springframework.core.codec.ByteArrayEncoder; +import org.springframework.core.codec.ByteBufferDecoder; +import org.springframework.core.codec.ByteBufferEncoder; +import org.springframework.core.codec.CharSequenceEncoder; +import org.springframework.core.codec.DataBufferDecoder; +import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -90,18 +100,33 @@ final class DefaultRSocketStrategies implements RSocketStrategies { private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); @Nullable - private DataBufferFactory dataBufferFactory; + private DataBufferFactory bufferFactory; + + + DefaultRSocketStrategiesBuilder() { + + // Order of decoders may be significant for default data MimeType + // selection in RSocketRequester.Builder - public DefaultRSocketStrategiesBuilder() { + this.decoders.add(StringDecoder.allMimeTypes()); + this.decoders.add(new ByteBufferDecoder()); + this.decoders.add(new ByteArrayDecoder()); + this.decoders.add(new DataBufferDecoder()); + + this.encoders.add(CharSequenceEncoder.allMimeTypes()); + this.encoders.add(new ByteBufferEncoder()); + this.encoders.add(new ByteArrayEncoder()); + this.encoders.add(new DataBufferEncoder()); } - public DefaultRSocketStrategiesBuilder(RSocketStrategies other) { + DefaultRSocketStrategiesBuilder(RSocketStrategies other) { this.encoders.addAll(other.encoders()); this.decoders.addAll(other.decoders()); this.adapterRegistry = other.reactiveAdapterRegistry(); - this.dataBufferFactory = other.dataBufferFactory(); + this.bufferFactory = other.dataBufferFactory(); } + @Override public Builder encoder(Encoder... encoders) { this.encoders.addAll(Arrays.asList(encoders)); @@ -135,14 +160,19 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @Override public Builder dataBufferFactory(DataBufferFactory bufferFactory) { - this.dataBufferFactory = bufferFactory; + this.bufferFactory = bufferFactory; return this; } @Override public RSocketStrategies build() { - return new DefaultRSocketStrategies(this.encoders, this.decoders, this.adapterRegistry, - this.dataBufferFactory != null ? this.dataBufferFactory : new DefaultDataBufferFactory()); + return new DefaultRSocketStrategies( + this.encoders, this.decoders, this.adapterRegistry, initBufferFactory()); + } + + private DataBufferFactory initBufferFactory() { + return this.bufferFactory != null ? this.bufferFactory : + new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 071e4d481a..59e9606e2d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -28,6 +28,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.core.codec.Decoder; import org.springframework.lang.Nullable; import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer; import org.springframework.util.MimeType; @@ -125,32 +126,32 @@ public interface RSocketRequester { interface Builder { /** - * Configure the MimeType for payload data which is then specified - * on the {@code SETUP} frame and applies to the whole connection. - *

By default this is set to the first concrete mime type supported - * by the configured encoders and decoders. - * @param mimeType the data MimeType to use + * Configure the payload data MimeType to specify on the {@code SETUP} + * frame that applies to the whole connection. + *

If this is not set, the builder will try to select the mime type + * based on the presence of a single + * {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default} + * {@code Decoder}, or the first default decoder otherwise + * (i.e. {@code String}) if no others are configured. */ RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType); /** - * Configure the MimeType for payload metadata which is then specified - * on the {@code SETUP} frame and applies to the whole connection. + * Configure the payload metadata MimeType to specify on the {@code SETUP} + * frame and applies to the whole connection. *

By default this is set to * {@code "message/x.rsocket.composite-metadata.v0"} in which case the * route, if provided, is encoded as a - * {@code "message/x.rsocket.routing.v0"} metadata entry, potentially - * with other metadata entries added too. If this is set to any other - * mime type, and a route is provided, it is assumed the mime type is - * for the route. - * @param mimeType the data MimeType to use + * {@code "message/x.rsocket.routing.v0"} composite metadata entry. + * For any other MimeType, it is assumed to be the MimeType for the + * route, if provided. */ RSocketRequester.Builder metadataMimeType(MimeType mimeType); /** - * Set the {@link RSocketStrategies} to use for access to encoders, - * decoders, and a factory for {@code DataBuffer's}. - * @param strategies the codecs strategies to use + * Set the {@link RSocketStrategies} to use. + *

By default this is set to {@code RSocketStrategies.builder().build()} + * but may be further customized via {@link #rsocketStrategies(Consumer)}. */ RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies); @@ -159,7 +160,6 @@ public interface RSocketRequester { *

By default this starts out with an empty builder, i.e. * {@link RSocketStrategies#builder()}, but the strategies can also be * set via {@link #rsocketStrategies(RSocketStrategies)}. - * @param configurer the configurer to apply */ RSocketRequester.Builder rsocketStrategies(Consumer configurer); @@ -172,7 +172,6 @@ public interface RSocketRequester { * {@code ClientRSocketFactory}. Use the shortcuts on this builder * instead since the created {@code RSocketRequester} needs to be aware * of those settings. - * @param configurer consumer to customize the factory * @see AnnotationClientResponderConfigurer */ RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 9e665a3bc7..8e6aa27f17 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -19,13 +19,17 @@ package org.springframework.messaging.rsocket; import java.util.List; import java.util.function.Consumer; -import io.netty.buffer.PooledByteBufAllocator; +import io.rsocket.Payload; +import io.rsocket.RSocketFactory.ClientRSocketFactory; +import io.rsocket.RSocketFactory.ServerRSocketFactory; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ResolvableType; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.MimeType; @@ -120,24 +124,28 @@ public interface RSocketStrategies { interface Builder { /** - * Add encoders to use for serializing Objects. - *

By default this is empty. + * Append to the list of encoders to use for serializing Objects to the + * data or metadata of a {@link Payload}. + *

By default this is initialized with encoders for {@code String}, + * {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}. */ Builder encoder(Encoder... encoder); /** - * Access and manipulate the list of configured {@link #encoder encoders}. + * Apply the consumer to the list of configured encoders, immediately. */ Builder encoders(Consumer>> consumer); /** - * Add decoders for de-serializing Objects. - *

By default this is empty. + * Append to the list of decoders to use for de-serializing Objects from + * the data or metadata of a {@link Payload}. + *

By default this is initialized with decoders for {@code String}, + * {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}. */ Builder decoder(Decoder... decoder); /** - * Access and manipulate the list of configured {@link #encoder decoders}. + * Apply the consumer to the list of configured decoders, immediately. */ Builder decoders(Consumer>> consumer); @@ -146,28 +154,23 @@ public interface RSocketStrategies { * to adapt to, and/or determine the semantics of a given * {@link org.reactivestreams.Publisher Publisher}. *

By default this {@link ReactiveAdapterRegistry#getSharedInstance()}. - * @param registry the registry to use */ Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry); /** - * Configure the DataBufferFactory to use for allocating buffers, for - * example when preparing requests or when responding. The choice here - * must be aligned with the frame decoder configured in - * {@link io.rsocket.RSocketFactory}. - *

By default this property is an instance of - * {@link org.springframework.core.io.buffer.DefaultDataBufferFactory - * DefaultDataBufferFactory} matching to the default frame decoder in - * {@link io.rsocket.RSocketFactory} which copies the payload. This - * comes at cost to performance but does not require reference counting - * and eliminates possibility for memory leaks. - *

To switch to a zero-copy strategy, - * configure RSocket - * accordingly, and then configure this property with an instance of - * {@link org.springframework.core.io.buffer.NettyDataBufferFactory - * NettyDataBufferFactory} with a pooled allocator such as - * {@link PooledByteBufAllocator#DEFAULT}. - * @param bufferFactory the DataBufferFactory to use + * Configure the DataBufferFactory to use for allocating buffers when + * preparing requests or creating responses. + *

By default this is set to {@link NettyDataBufferFactory} with + * pooled, allocated buffers for zero copy. RSocket must also be + * configured + * for zero copy. For client setup, {@link RSocketRequester.Builder} + * adapts automatically to the {@code DataBufferFactory} configured + * here, and sets the frame decoder in {@link ClientRSocketFactory + * ClientRSocketFactory} accordingly. For server setup, the + * {@link ServerRSocketFactory ServerRSocketFactory} must be configured + * accordingly too for zero copy. + *

If using {@link DefaultDataBufferFactory} instead, there is no + * need for related config changes in RSocket. */ Builder dataBufferFactory(DataBufferFactory bufferFactory); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index 14d7d05ec7..bd497b5860 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -17,11 +17,16 @@ package org.springframework.messaging.rsocket; import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Consumer; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.RSocketFactory; +import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.ClientTransport; import org.junit.Before; import org.junit.Test; @@ -29,13 +34,19 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.DecodingException; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.given; @@ -52,6 +63,8 @@ public class DefaultRSocketRequesterBuilderTests { private ClientTransport transport; + private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer(); + @Before public void setup() { @@ -63,53 +76,113 @@ public class DefaultRSocketRequesterBuilderTests { @Test @SuppressWarnings("unchecked") public void shouldApplyCustomizationsAtSubscription() { - ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class); Consumer strategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() - .rsocketFactory(factoryConfigurer) + .rsocketFactory(this.rsocketFactoryConfigurer) .rsocketStrategies(strategiesConfigurer) .connect(this.transport); - verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer); + + verifyZeroInteractions(this.transport); + assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNull(); } @Test @SuppressWarnings("unchecked") public void shouldApplyCustomizations() { - RSocketStrategies strategies = RSocketStrategies.builder() - .encoder(CharSequenceEncoder.allMimeTypes()) - .decoder(StringDecoder.allMimeTypes()) - .build(); - ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class); - Consumer strategiesConfigurer = mock(Consumer.class); + Consumer rsocketStrategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() - .rsocketStrategies(strategies) - .rsocketFactory(factoryConfigurer) - .rsocketStrategies(strategiesConfigurer) + .rsocketFactory(this.rsocketFactoryConfigurer) + .rsocketStrategies(rsocketStrategiesConfigurer) .connect(this.transport) .block(); + + // RSocketStrategies and RSocketFactory configurers should have been called + verify(this.transport).connect(anyInt()); - verify(factoryConfigurer).configureWithStrategies(any(RSocketStrategies.class)); - verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class)); - verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); + verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); + assertThat(this.rsocketFactoryConfigurer.rsocketStrategies()).isNotNull(); + assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull(); + } + + @Test + public void defaultDataMimeType() { + RSocketRequester requester = RSocketRequester.builder() + .connect(this.transport) + .block(); + + assertThat(requester.dataMimeType()) + .as("Default data MimeType, based on the first configured Decoder") + .isEqualTo(MimeTypeUtils.TEXT_PLAIN); } @Test - public void dataMimeType() throws NoSuchFieldException { + public void defaultDataMimeTypeWithCustomDecoderRegitered() { RSocketStrategies strategies = RSocketStrategies.builder() - .encoder(CharSequenceEncoder.allMimeTypes()) - .decoder(StringDecoder.allMimeTypes()) + .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON)) .build(); RSocketRequester requester = RSocketRequester.builder() .rsocketStrategies(strategies) + .connect(this.transport) + .block(); + + assertThat(requester.dataMimeType()) + .as("Default data MimeType, based on the first configured, non-default Decoder") + .isEqualTo(MimeTypeUtils.APPLICATION_JSON); + } + + @Test + public void defaultDataMimeTypeWithMultipleCustomDecoderRegitered() { + RSocketStrategies strategies = RSocketStrategies.builder() + .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON)) + .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_XML)) + .build(); + + assertThatThrownBy(() -> + RSocketRequester + .builder() + .rsocketStrategies(strategies) + .connect(this.transport) + .block()) + .hasMessageContaining("Cannot select default data MimeType"); + } + + @Test + public void dataMimeTypeSet() { + RSocketRequester requester = RSocketRequester.builder() .dataMimeType(MimeTypeUtils.APPLICATION_JSON) .connect(this.transport) .block(); - Field field = DefaultRSocketRequester.class.getDeclaredField("dataMimeType"); + assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON); + } + + @Test + public void frameDecoderMatchesDataBufferFactory() throws Exception { + testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY); + testFrameDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT); + } + + private void testFrameDecoder(DataBufferFactory bufferFactory, PayloadDecoder frameDecoder) + throws NoSuchFieldException { + + RSocketStrategies strategies = RSocketStrategies.builder() + .dataBufferFactory(bufferFactory) + .build(); + + RSocketRequester.builder() + .rsocketStrategies(strategies) + .rsocketFactory(this.rsocketFactoryConfigurer) + .connect(this.transport) + .block(); + + RSocketFactory.ClientRSocketFactory factory = this.rsocketFactoryConfigurer.rsocketFactory(); + assertThat(factory).isNotNull(); + + Field field = RSocketFactory.ClientRSocketFactory.class.getDeclaredField("payloadDecoder"); ReflectionUtils.makeAccessible(field); - MimeType dataMimeType = (MimeType) ReflectionUtils.getField(field, requester); - assertThat(dataMimeType).isEqualTo(MimeTypeUtils.APPLICATION_JSON); + PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, factory); + assertThat(decoder).isSameAs(frameDecoder); } @@ -135,4 +208,75 @@ public class DefaultRSocketRequesterBuilderTests { } } + + static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer { + + private RSocketStrategies strategies; + + private RSocketFactory.ClientRSocketFactory rsocketFactory; + + + public RSocketStrategies rsocketStrategies() { + return this.strategies; + } + + public RSocketFactory.ClientRSocketFactory rsocketFactory() { + return this.rsocketFactory; + } + + + @Override + public void configureWithStrategies(RSocketStrategies strategies) { + this.strategies = strategies; + } + + @Override + public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) { + this.rsocketFactory = rsocketFactory; + } + } + + + static class TestJsonDecoder implements Decoder { + + private final MimeType mimeType; + + + TestJsonDecoder(MimeType mimeType) { + this.mimeType = mimeType; + } + + @Override + public List getDecodableMimeTypes() { + return Collections.singletonList(this.mimeType); + } + + @Override + public boolean canDecode(ResolvableType elementType, MimeType mimeType) { + return false; + } + + @Override + public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Map hints) { + + throw new UnsupportedOperationException(); + } + + @Override + public Flux decode(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Map hints) { + + throw new UnsupportedOperationException(); + } + + + @Override + public Object decode(DataBuffer buffer, ResolvableType targetType, MimeType mimeType, + Map hints) throws DecodingException { + + throw new UnsupportedOperationException(); + } + } + }