Browse Source

Fix RSocketRequester API for requests without payload

This commit makes it possible to send requests without
requiring to call data(Mono.empty()). It introduces a
dedicated MetadataSpec interface and merge ResponseSpec
into RequestSpec for more flexibility.

Closes gh-23649
pull/23772/head
Sebastien Deleuze 6 years ago
parent
commit
5adc3d6666
  1. 65
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java
  2. 50
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java
  3. 26
      spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt
  4. 5
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java
  5. 51
      spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt

65
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

@ -116,6 +116,12 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -116,6 +116,12 @@ final class DefaultRSocketRequester implements RSocketRequester {
private final MetadataEncoder metadataEncoder;
@Nullable
private Mono<Payload> payloadMono = Mono.empty();
@Nullable
private Flux<Payload> payloadFlux = null;
public DefaultRequestSpec(String route, Object... vars) {
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
@ -135,24 +141,26 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -135,24 +141,26 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
@Override
public RequestSpec metadata(Consumer<RequestSpec> configurer) {
public RequestSpec metadata(Consumer<MetadataSpec<?>> configurer) {
configurer.accept(this);
return this;
}
@Override
public ResponseSpec data(Object data) {
public RequestSpec data(Object data) {
Assert.notNull(data, "'data' must not be null");
return toResponseSpec(data, ResolvableType.NONE);
createPayload(data, ResolvableType.NONE);
return this;
}
@Override
public ResponseSpec data(Object producer, Class<?> elementClass) {
public RequestSpec data(Object producer, Class<?> elementClass) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementClass, "'elementClass' must not be null");
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
createPayload(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
return this;
}
@Nullable
@ -161,15 +169,16 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -161,15 +169,16 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
@Override
public ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
public RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementTypeRef, "'elementTypeRef' must not be null");
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
createPayload(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
return this;
}
private ResponseSpec toResponseSpec(Object input, ResolvableType elementType) {
private void createPayload(Object input, ResolvableType elementType) {
ReactiveAdapter adapter = getAdapter(input.getClass());
Publisher<?> publisher;
if (input instanceof Publisher) {
@ -179,31 +188,35 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -179,31 +188,35 @@ final class DefaultRSocketRequester implements RSocketRequester {
publisher = adapter.toPublisher(input);
}
else {
Mono<Payload> payloadMono = Mono
this.payloadMono = Mono
.fromCallable(() -> encodeData(input, ResolvableType.forInstance(input), null))
.map(this::firstPayload)
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadMono);
this.payloadFlux = null;
return;
}
if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
Mono<Payload> payloadMono = Mono.when(publisher).then(emptyPayload());
return new DefaultResponseSpec(payloadMono);
this.payloadMono = Mono.when(publisher).then(emptyPayload());
this.payloadFlux = null;
return;
}
Encoder<?> encoder = elementType != ResolvableType.NONE && !Object.class.equals(elementType.resolve()) ?
strategies.encoder(elementType, dataMimeType) : null;
if (adapter != null && !adapter.isMultiValue()) {
Mono<Payload> payloadMono = Mono.from(publisher)
this.payloadMono = Mono.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.map(this::firstPayload)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadMono);
this.payloadFlux = null;
return;
}
Flux<Payload> payloadFlux = Flux.from(publisher)
this.payloadMono = null;
this.payloadFlux = Flux.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.switchOnFirst((signal, inner) -> {
DataBuffer data = signal.get();
@ -217,7 +230,6 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -217,7 +230,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
})
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadFlux);
}
@SuppressWarnings("unchecked")
@ -245,26 +257,6 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -245,26 +257,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
private Mono<Payload> emptyPayload() {
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
}
}
private class DefaultResponseSpec implements ResponseSpec {
@Nullable
private final Mono<Payload> payloadMono;
@Nullable
private final Flux<Payload> payloadFlux;
DefaultResponseSpec(Mono<Payload> payloadMono) {
this.payloadMono = payloadMono;
this.payloadFlux = null;
}
DefaultResponseSpec(Flux<Payload> payloadFlux) {
this.payloadMono = null;
this.payloadFlux = payloadFlux;
}
@Override
public Mono<Void> send() {
@ -325,5 +317,4 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -325,5 +317,4 @@ final class DefaultRSocketRequester implements RSocketRequester {
return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
}
}
}

50
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

@ -231,22 +231,10 @@ public interface RSocketRequester { @@ -231,22 +231,10 @@ public interface RSocketRequester {
}
/**
* Spec for providing input data for an RSocket request.
* Spec for providing input data for an RSocket request and triggering the exchange.
*/
interface RequestSpec {
/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
RequestSpec metadata(Object metadata, MimeType mimeType);
interface RequestSpec extends MetadataSpec<RequestSpec> {
/**
* Append additional metadata entries through a {@code Consumer}.
@ -255,7 +243,7 @@ public interface RSocketRequester { @@ -255,7 +243,7 @@ public interface RSocketRequester {
* @param configurer the configurer to apply
* @throws IllegalArgumentException if not using composite metadata.
*/
RequestSpec metadata(Consumer<RequestSpec> configurer);
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);
/**
* Provide payload data for the request. This can be one of:
@ -268,7 +256,7 @@ public interface RSocketRequester { @@ -268,7 +256,7 @@ public interface RSocketRequester {
* @param data the Object value for the payload data
* @return spec to declare the expected response
*/
ResponseSpec data(Object data);
RequestSpec data(Object data);
/**
* Variant of {@link #data(Object)} that also accepts a hint for the
@ -280,7 +268,7 @@ public interface RSocketRequester { @@ -280,7 +268,7 @@ public interface RSocketRequester {
* @param elementClass the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, Class<?> elementClass);
RequestSpec data(Object producer, Class<?> elementClass);
/**
* Variant of {@link #data(Object, Class)} for when the type hint has
@ -291,14 +279,7 @@ public interface RSocketRequester { @@ -291,14 +279,7 @@ public interface RSocketRequester {
* @param elementTypeRef the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
}
/**
* Spect to declare the type of request and expected response.
*/
interface ResponseSpec {
RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
@ -343,4 +324,23 @@ public interface RSocketRequester { @@ -343,4 +324,23 @@ public interface RSocketRequester {
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
}
/**
* Spec for specifying the metadata.
*
* @param <S> a self reference to the spec type
*/
interface MetadataSpec<S extends MetadataSpec<S>> {
/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
S metadata(Object metadata, MimeType mimeType);
}
}

26
spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt

@ -66,7 +66,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket @@ -66,7 +66,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RequestSpec =
data(producer, object : ParameterizedTypeReference<T>() {})
/**
@ -78,7 +78,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: @@ -78,7 +78,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer:
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.RequestSpec =
data(publisher, object : ParameterizedTypeReference<T>() {})
/**
@ -90,58 +90,58 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher @@ -90,58 +90,58 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.RequestSpec =
data(flow, object : ParameterizedTypeReference<T>() {})
/**
* Coroutines variant of [RSocketRequester.ResponseSpec.send].
* Coroutines variant of [RSocketRequester.RequestSpec.send].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun RSocketRequester.ResponseSpec.sendAndAwait() {
suspend fun RSocketRequester.RequestSpec.sendAndAwait() {
send().awaitFirstOrNull()
}
/**
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveMono].
* Coroutines variant of [RSocketRequester.RequestSpec.retrieveMono].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveAndAwait(): T =
suspend inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveAndAwait(): T =
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitSingle()
/**
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveFlux].
* Coroutines variant of [RSocketRequester.RequestSpec.retrieveFlux].
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveFlow(): Flow<T> =
inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveFlow(): Flow<T> =
retrieveFlux(object : ParameterizedTypeReference<T>() {}).asFlow()
/**
* Extension for [RSocketRequester.ResponseSpec.retrieveMono] providing a `retrieveMono<Foo>()`
* Extension for [RSocketRequester.RequestSpec.retrieveMono] providing a `retrieveMono<Foo>()`
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveMono(): Mono<T> =
inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveMono(): Mono<T> =
retrieveMono(object : ParameterizedTypeReference<T>() {})
/**
* Extension for [RSocketRequester.ResponseSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
* Extension for [RSocketRequester.RequestSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveFlux(): Flux<T> =
inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveFlux(): Flux<T> =
retrieveFlux(object : ParameterizedTypeReference<T>() {})

5
spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java

@ -38,7 +38,6 @@ import reactor.test.StepVerifier; @@ -38,7 +38,6 @@ import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
import org.springframework.messaging.rsocket.RSocketRequester.ResponseSpec;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
@ -87,7 +86,7 @@ public class DefaultRSocketRequesterTests { @@ -87,7 +86,7 @@ public class DefaultRSocketRequesterTests {
testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), "");
}
private void testSendMono(Function<RequestSpec, ResponseSpec> mapper, String expectedValue) {
private void testSendMono(Function<RequestSpec, RequestSpec> mapper, String expectedValue) {
mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5));
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget");
@ -111,7 +110,7 @@ public class DefaultRSocketRequesterTests { @@ -111,7 +110,7 @@ public class DefaultRSocketRequesterTests {
testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values);
}
private void testSendFlux(Function<RequestSpec, ResponseSpec> mapper, String... expectedValues) {
private void testSendFlux(Function<RequestSpec, RequestSpec> mapper, String... expectedValues) {
this.rsocket.reset();
mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));

51
spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt

@ -74,83 +74,78 @@ class RSocketRequesterExtensionsTests { @@ -74,83 +74,78 @@ class RSocketRequesterExtensionsTests {
@Test
fun `dataWithType with Publisher`() {
val requestSpec = mockk<RSocketRequester.RequestSpec>()
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
val data = mockk<Publisher<String>>()
every { requestSpec.data(any<Publisher<String>>(), match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns responseSpec
assertThat(requestSpec.dataWithType(data)).isEqualTo(responseSpec)
every { requestSpec.data(any<Publisher<String>>(), match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns requestSpec
assertThat(requestSpec.dataWithType(data)).isEqualTo(requestSpec)
}
@Test
fun `dataWithType with Flow`() {
val requestSpec = mockk<RSocketRequester.RequestSpec>()
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
val data = mockk<Flow<String>>()
every { requestSpec.data(any<Publisher<String>>(), match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns responseSpec
assertThat(requestSpec.dataWithType(data)).isEqualTo(responseSpec)
every { requestSpec.data(any<Publisher<String>>(), match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns requestSpec
assertThat(requestSpec.dataWithType(data)).isEqualTo(requestSpec)
}
@Test
fun `dataWithType with CompletableFuture`() {
val requestSpec = mockk<RSocketRequester.RequestSpec>()
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
val data = mockk<CompletableFuture<String>>()
every { requestSpec.data(any<Publisher<String>>(), match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns responseSpec
assertThat(requestSpec.dataWithType<String>(data)).isEqualTo(responseSpec)
every { requestSpec.data(any<Publisher<String>>(), match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns requestSpec
assertThat(requestSpec.dataWithType<String>(data)).isEqualTo(requestSpec)
}
@Test
fun dataFlowWithoutType() {
val requestSpec = mockk<RSocketRequester.RequestSpec>()
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
every { requestSpec.data(any()) } returns responseSpec
assertThat(requestSpec.data(mockk())).isEqualTo(responseSpec)
every { requestSpec.data(any()) } returns requestSpec
assertThat(requestSpec.data(mockk())).isEqualTo(requestSpec)
}
@Test
fun sendAndAwait() {
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
every { responseSpec.send() } returns Mono.empty()
val requestSpec = mockk<RSocketRequester.RequestSpec>()
every { requestSpec.send() } returns Mono.empty()
runBlocking {
responseSpec.sendAndAwait()
requestSpec.sendAndAwait()
}
}
@Test
fun retrieveAndAwait() {
val response = "foo"
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
every { responseSpec.retrieveMono(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Mono.just("foo")
val requestSpec = mockk<RSocketRequester.RequestSpec>()
every { requestSpec.retrieveMono(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Mono.just("foo")
runBlocking {
assertThat(responseSpec.retrieveAndAwait<String>()).isEqualTo(response)
assertThat(requestSpec.retrieveAndAwait<String>()).isEqualTo(response)
}
}
@Test
@ExperimentalCoroutinesApi
fun retrieveFlow() {
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
every { responseSpec.retrieveFlux(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Flux.just("foo", "bar")
val requestSpec = mockk<RSocketRequester.RequestSpec>()
every { requestSpec.retrieveFlux(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Flux.just("foo", "bar")
runBlocking {
assertThat(responseSpec.retrieveFlow<String>().toList()).contains("foo", "bar")
assertThat(requestSpec.retrieveFlow<String>().toList()).contains("foo", "bar")
}
}
@Test
fun retrieveMono() {
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
every { responseSpec.retrieveMono(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Mono.just("foo")
val requestSpec = mockk<RSocketRequester.RequestSpec>()
every { requestSpec.retrieveMono(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Mono.just("foo")
runBlocking {
assertThat(responseSpec.retrieveMono<String>().block()).isEqualTo("foo")
assertThat(requestSpec.retrieveMono<String>().block()).isEqualTo("foo")
}
}
@Test
fun retrieveFlux() {
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
every { responseSpec.retrieveFlux(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Flux.just("foo", "bar")
val requestSpec = mockk<RSocketRequester.RequestSpec>()
every { requestSpec.retrieveFlux(match<ParameterizedTypeReference<*>>(stringTypeRefMatcher)) } returns Flux.just("foo", "bar")
runBlocking {
assertThat(responseSpec.retrieveFlux<String>().collectList().block()).contains("foo", "bar")
assertThat(requestSpec.retrieveFlux<String>().collectList().block()).contains("foo", "bar")
}
}
}

Loading…
Cancel
Save