Browse Source

Polishing and minor refactoring

See gh-25884
pull/25897/head
Rossen Stoyanchev 4 years ago
parent
commit
5b1b20c8c0
  1. 2
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
  2. 10
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java
  3. 22
      spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java
  4. 7
      spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java
  5. 15
      spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
  6. 5
      spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java
  7. 7
      spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java
  8. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java
  9. 4
      spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
  10. 10
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  11. 16
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
  12. 10
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java

2
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

@ -75,7 +75,7 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> { @@ -75,7 +75,7 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
@Override
public void close() {
// Ignore result: can't overflow, ok if not first or no one listens
// Ignore result: concurrent attempts to complete are ok
this.completionSink.tryEmitEmpty();
}

10
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

@ -200,10 +200,12 @@ class RSocketServerToClientIntegrationTests { @@ -200,10 +200,12 @@ class RSocketServerToClientIntegrationTests {
private void runTest(Runnable testEcho) {
Mono.fromRunnable(testEcho)
.doOnError(ex -> resultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(o -> resultSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))
.subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block
.subscribe();
.subscribe(
aVoid -> {},
ex -> resultSink.tryEmitError(ex), // Ignore result: signals cannot compete
() -> resultSink.tryEmitEmpty()
);
}
@MessageMapping("fnf")
@ -218,7 +220,7 @@ class RSocketServerToClientIntegrationTests { @@ -218,7 +220,7 @@ class RSocketServerToClientIntegrationTests {
@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.tryEmitNext(payload);
this.fireForgetPayloads.emitNext(payload, Sinks.EmitFailureHandler.FAIL_FAST);
}
@MessageMapping("echo")

22
spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java

@ -341,8 +341,8 @@ public class SimpAnnotationMethodMessageHandlerTests { @@ -341,8 +341,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
assertThat(controller.oneSink).isNotNull();
controller.oneSink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST);
assertThat(controller.sinkOne).isNotNull();
controller.sinkOne.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST);
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
assertThat(this.payloadCaptor.getValue()).isEqualTo("foo");
}
@ -356,7 +356,7 @@ public class SimpAnnotationMethodMessageHandlerTests { @@ -356,7 +356,7 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
controller.oneSink.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST);
controller.sinkOne.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST);
assertThat(controller.exceptionCaught).isTrue();
}
@ -369,8 +369,8 @@ public class SimpAnnotationMethodMessageHandlerTests { @@ -369,8 +369,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message<?> message = createMessage("/app1/flux");
this.messageHandler.handleMessage(message);
assertThat(controller.manySink).isNotNull();
controller.manySink.tryEmitNext("foo");
assertThat(controller.sinkMany).isNotNull();
controller.sinkMany.tryEmitNext("foo");
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
}
@ -584,22 +584,22 @@ public class SimpAnnotationMethodMessageHandlerTests { @@ -584,22 +584,22 @@ public class SimpAnnotationMethodMessageHandlerTests {
@Controller
private static class ReactiveController {
private Sinks.One<String> oneSink;
private Sinks.One<String> sinkOne;
private Sinks.Many<String> manySink;
private Sinks.Many<String> sinkMany;
private boolean exceptionCaught = false;
@MessageMapping("mono")
public Mono<String> handleMono() {
this.oneSink = Sinks.one();
return this.oneSink.asMono();
this.sinkOne = Sinks.one();
return this.sinkOne.asMono();
}
@MessageMapping("flux")
public Flux<String> handleFlux() {
this.manySink = Sinks.many().unicast().onBackpressureBuffer();
return this.manySink.asFlux();
this.sinkMany = Sinks.many().unicast().onBackpressureBuffer();
return this.sinkMany.asFlux();
}
@MessageExceptionHandler(IllegalStateException.class)

7
spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java

@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { @@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
this.writeHandler = body -> {
// Avoid .then() that causes data buffers to be discarded and released
Sinks.Empty<Void> completion = Sinks.unsafe().empty();
this.body = body
.doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized
.doOnError(completion::tryEmitError)
.cache();
this.body.subscribe();
this.body = body.cache();
this.body.subscribe(aVoid -> {}, completion::tryEmitError, completion::tryEmitEmpty); // Signals are serialized
return completion.asMono();
};
}

15
spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java

@ -83,8 +83,9 @@ public class HttpHandlerConnector implements ClientHttpConnector { @@ -83,8 +83,9 @@ public class HttpHandlerConnector implements ClientHttpConnector {
private Mono<ClientHttpResponse> doConnect(
HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
Sinks.Empty<Void> requestWriteCompletion = Sinks.empty();
Sinks.Empty<Void> handlerCompletion = Sinks.empty();
// unsafe(): we're intercepting, already serialized Publisher signals
Sinks.Empty<Void> requestWriteSink = Sinks.unsafe().empty();
Sinks.Empty<Void> handlerSink = Sinks.unsafe().empty();
ClientHttpResponse[] savedResponse = new ClientHttpResponse[1];
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
@ -96,8 +97,8 @@ public class HttpHandlerConnector implements ClientHttpConnector { @@ -96,8 +97,8 @@ public class HttpHandlerConnector implements ClientHttpConnector {
ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
this.handler.handle(mockServerRequest, responseToUse).subscribe(
aVoid -> {},
handlerCompletion::tryEmitError, // Ignore error: cached + serialized
handlerCompletion::tryEmitEmpty);
handlerSink::tryEmitError, // Ignore result: signals cannot compete
handlerSink::tryEmitEmpty);
return Mono.empty();
});
@ -110,10 +111,10 @@ public class HttpHandlerConnector implements ClientHttpConnector { @@ -110,10 +111,10 @@ public class HttpHandlerConnector implements ClientHttpConnector {
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(
aVoid -> {},
requestWriteCompletion::tryEmitError, // Ignore error: cached + serialized
requestWriteCompletion::tryEmitEmpty);
requestWriteSink::tryEmitError, // Ignore result: signals cannot compete
requestWriteSink::tryEmitEmpty);
return Mono.when(requestWriteCompletion.asMono(), handlerCompletion.asMono())
return Mono.when(requestWriteSink.asMono(), handlerSink.asMono())
.onErrorMap(ex -> {
ClientHttpResponse response = savedResponse[0];
return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex;

5
spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java

@ -168,7 +168,6 @@ class WiretapConnector implements ClientHttpConnector { @@ -168,7 +168,6 @@ class WiretapConnector implements ClientHttpConnector {
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
// Ignore result: OK or not relevant
this.content.tryEmitEmpty();
}
}
@ -206,14 +205,14 @@ class WiretapConnector implements ClientHttpConnector { @@ -206,14 +205,14 @@ class WiretapConnector implements ClientHttpConnector {
private void handleOnError(Throwable ex) {
// Ignore result: OK or not relevant
// Ignore result: signals cannot compete
this.content.tryEmitError(ex);
}
private void handleOnComplete() {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
// Ignore result: OK or not relevant
// Ignore result: signals cannot compete
this.content.tryEmitValue(bytes);
}
}

7
spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java

@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { @@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
this.writeHandler = body -> {
// Avoid .then() that causes data buffers to be discarded and released
Sinks.Empty<Void> completion = Sinks.unsafe().empty();
this.body = body
.doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized
.doOnError(completion::tryEmitError)
.cache();
this.body.subscribe();
this.body = body.cache();
this.body.subscribe(aVoid -> {}, completion::tryEmitError, completion::tryEmitEmpty); // Signals are serialized
return completion.asMono();
};
}

7
spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java

@ -57,9 +57,10 @@ public class ErrorsMethodArgumentResolver extends HandlerMethodArgumentResolverS @@ -57,9 +57,10 @@ public class ErrorsMethodArgumentResolver extends HandlerMethodArgumentResolverS
Object errors = getErrors(parameter, context);
// Initially Errors/BindingResult is a Mono in the model even if it cannot be declared
// as an async argument. That way it can be resolved first while the Mono can complete
// later at which point the model is also updated for further use.
// Initially ModelAttributeMethodArgumentResolver adds Errors/BindingResult as a
// Mono in the model even if it can't be declared as such on a controller method.
// This is done to enable early argument resolution here. When the Mono actually
// completes it is replaced in the model with the actual value.
if (Mono.class.isAssignableFrom(errors.getClass())) {
return ((Mono<?>) errors).cast(Object.class);

4
spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java

@ -119,13 +119,13 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR @@ -119,13 +119,13 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
return valueMono.flatMap(value -> {
WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name);
return bindRequestParameters(binder, exchange)
.doOnError(ex -> bindingResultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnError(bindingResultSink::tryEmitError)
.doOnSuccess(aVoid -> {
validateIfApplicable(binder, parameter);
BindingResult bindingResult = binder.getBindingResult();
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResult);
model.put(name, value);
// serialized and buffered (should never fail)
// Ignore result: serialized and buffered (should never fail)
bindingResultSink.tryEmitValue(bindingResult);
})
.then(Mono.fromCallable(() -> {

10
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.websocket.Session;
import org.apache.tomcat.websocket.WsSession;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.HandshakeInfo;
@ -46,7 +47,14 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { @@ -46,7 +47,14 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
super(session, info, factory);
}
@SuppressWarnings("deprecation")
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
Sinks.Empty<Void> completionSink) {
super(session, info, factory, completionSink);
suspendReceiving();
}
@Deprecated
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
reactor.core.publisher.MonoProcessor<Void> completionMono) {

16
spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java

@ -112,10 +112,10 @@ public class StandardWebSocketClient implements WebSocketClient { @@ -112,10 +112,10 @@ public class StandardWebSocketClient implements WebSocketClient {
}
private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler,
Sinks.Empty<Void> completion, DefaultConfigurator configurator) {
Sinks.Empty<Void> completionSink, DefaultConfigurator configurator) {
return new StandardWebSocketHandlerAdapter(handler, session ->
createWebSocketSession(session, createHandshakeInfo(url, configurator), completion));
createWebSocketSession(session, createHandshakeInfo(url, configurator), completionSink));
}
private HandshakeInfo createHandshakeInfo(URI url, DefaultConfigurator configurator) {
@ -124,21 +124,13 @@ public class StandardWebSocketClient implements WebSocketClient { @@ -124,21 +124,13 @@ public class StandardWebSocketClient implements WebSocketClient {
return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
}
protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
Sinks.Empty<Void> completionSink) {
protected StandardWebSocketSession createWebSocketSession(
Session session, HandshakeInfo info, Sinks.Empty<Void> completionSink) {
return new StandardWebSocketSession(
session, info, DefaultDataBufferFactory.sharedInstance, completionSink);
}
@Deprecated
protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
reactor.core.publisher.MonoProcessor<Void> completionMono) {
return new StandardWebSocketSession(
session, info, DefaultDataBufferFactory.sharedInstance, completionMono);
}
private ClientEndpointConfig createEndpointConfig(Configurator configurator, List<String> subProtocols) {
return ClientEndpointConfig.Builder.create()
.configurator(configurator)

10
spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import javax.websocket.Session; @@ -20,6 +20,7 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.apache.tomcat.websocket.WsWebSocketContainer;
import reactor.core.publisher.Sinks;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
@ -44,11 +45,10 @@ public class TomcatWebSocketClient extends StandardWebSocketClient { @@ -44,11 +45,10 @@ public class TomcatWebSocketClient extends StandardWebSocketClient {
@Override
@SuppressWarnings("deprecation")
protected StandardWebSocketSession createWebSocketSession(Session session,
HandshakeInfo info, reactor.core.publisher.MonoProcessor<Void> completionMono) {
protected StandardWebSocketSession createWebSocketSession(
Session session, HandshakeInfo info, Sinks.Empty<Void> completionSink) {
return new TomcatWebSocketSession(session, info, bufferFactory(), completionMono);
return new TomcatWebSocketSession(session, info, bufferFactory(), completionSink);
}
}

Loading…
Cancel
Save