Browse Source

Optimize uses of onErrorResume()

This commit replaces uses of onErrorResume() with
- onErrorMap() in places where onErrorResume() is just used to map to a
  different exception.
- onErrorComplete() where onErrorResume() just maps to Mono.empty().
- onErrorReturn() where onErrorResum() just maps to Mono.just().

Closes gh-31352
pull/31354/head
Kai Zander 1 year ago committed by Brian Clozel
parent
commit
e8b42c5439
  1. 4
      spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java
  2. 6
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.java
  3. 6
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
  4. 4
      spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
  5. 2
      spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractDatabaseClientIntegrationTests.java
  6. 2
      spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java
  7. 2
      spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java
  8. 4
      spring-web/src/main/java/org/springframework/web/server/ServerWebExchange.java
  9. 4
      spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java
  10. 2
      spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java
  11. 4
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  12. 4
      spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/AbstractMessageReaderArgumentResolver.java
  13. 2
      spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java

4
spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java

@ -231,7 +231,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol @@ -231,7 +231,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
}
@ -245,7 +245,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol @@ -245,7 +245,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
}

6
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.java

@ -256,12 +256,12 @@ public class ReactorNetty2TcpClient<P> implements TcpOperations<P> { @@ -256,12 +256,12 @@ public class ReactorNetty2TcpClient<P> implements TcpOperations<P> {
this.channelGroup.close().addListener(future -> channnelGroupCloseSink.tryEmitEmpty());
result = channnelGroupCloseSink.asMono();
if (this.loopResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
result = result.onErrorComplete().then(this.loopResources.disposeLater());
}
if (this.poolResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
result = result.onErrorComplete().then(this.poolResources.disposeLater());
}
result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
result = result.onErrorComplete().then(stopScheduler());
}
else {
result = stopScheduler();

6
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

@ -254,12 +254,12 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { @@ -254,12 +254,12 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
if (this.channelGroup != null) {
result = FutureMono.from(this.channelGroup.close());
if (this.loopResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
result = result.onErrorComplete().then(this.loopResources.disposeLater());
}
if (this.poolResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
result = result.onErrorComplete().then(this.poolResources.disposeLater());
}
result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
result = result.onErrorComplete().then(stopScheduler());
}
else {
result = stopScheduler();

4
spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java

@ -228,8 +228,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager @@ -228,8 +228,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
.then(Mono.error(ex));
}
return Mono.error(ex);
})).onErrorResume(ex -> Mono.error(new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", ex)));
})).onErrorMap(ex -> new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", ex));
}).then();
}

2
spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractDatabaseClientIntegrationTests.java

@ -50,7 +50,7 @@ abstract class AbstractDatabaseClientIntegrationTests { @@ -50,7 +50,7 @@ abstract class AbstractDatabaseClientIntegrationTests {
Mono.from(connectionFactory.create())
.flatMapMany(connection -> Flux.from(connection.createStatement("DROP TABLE legoset").execute())
.flatMap(Result::getRowsUpdated)
.onErrorResume(e -> Mono.empty())
.onErrorComplete()
.thenMany(connection.createStatement(getCreateTableStatement()).execute())
.flatMap(Result::getRowsUpdated).thenMany(connection.close())).as(StepVerifier::create)
.verifyComplete();

2
spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java

@ -69,7 +69,7 @@ abstract class AbstractTransactionalDatabaseClientIntegrationTests { @@ -69,7 +69,7 @@ abstract class AbstractTransactionalDatabaseClientIntegrationTests {
Mono.from(connectionFactory.create())
.flatMapMany(connection -> Flux.from(connection.createStatement("DROP TABLE legoset").execute())
.flatMap(Result::getRowsUpdated)
.onErrorResume(e -> Mono.empty())
.onErrorComplete()
.thenMany(connection.createStatement(getCreateTableStatement()).execute())
.flatMap(Result::getRowsUpdated).thenMany(connection.close())).as(StepVerifier::create).verifyComplete();

2
spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java

@ -191,7 +191,7 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> { @@ -191,7 +191,7 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
if (InputStreamResource.class != resource.getClass()) {
return Mono.fromCallable(resource::contentLength)
.filter(length -> length != -1)
.onErrorResume(IOException.class, t -> Mono.empty())
.onErrorComplete(IOException.class)
.subscribeOn(Schedulers.boundedElastic());
}
else {

4
spring-web/src/main/java/org/springframework/web/server/ServerWebExchange.java

@ -148,10 +148,10 @@ public interface ServerWebExchange { @@ -148,10 +148,10 @@ public interface ServerWebExchange {
*/
default Mono<Void> cleanupMultipart() {
return getMultipartData()
.onErrorResume(t -> Mono.empty()) // ignore errors reading multipart data
.onErrorComplete() // ignore errors reading multipart data
.flatMapIterable(Map::values)
.flatMapIterable(Function.identity())
.flatMap(part -> part.delete().onErrorResume(ex -> Mono.empty()))
.flatMap(part -> part.delete().onErrorComplete())
.then();
}

4
spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java

@ -249,11 +249,11 @@ public class DefaultServerWebExchange implements ServerWebExchange { @@ -249,11 +249,11 @@ public class DefaultServerWebExchange implements ServerWebExchange {
public Mono<Void> cleanupMultipart() {
if (this.multipartRead) {
return getMultipartData()
.onErrorResume(t -> Mono.empty()) // ignore errors reading multipart data
.onErrorComplete() // ignore errors reading multipart data
.flatMapIterable(Map::values)
.flatMapIterable(Function.identity())
.flatMap(part -> part.delete()
.onErrorResume(ex -> Mono.empty()))
.onErrorComplete())
.then();
}
else {

2
spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

@ -105,7 +105,7 @@ class ServerHttpResponseTests { @@ -105,7 +105,7 @@ class ServerHttpResponseTests {
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
headers.setContentLength(12);
response.writeWith(body).onErrorResume(ex -> Mono.empty()).block();
response.writeWith(body).onErrorComplete().block();
assertThat(response.statusCodeWritten).isFalse();
assertThat(response.headersWritten).isFalse();

4
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -186,11 +186,11 @@ final class DefaultWebClient implements WebClient { @@ -186,11 +186,11 @@ final class DefaultWebClient implements WebClient {
}
private static Mono<Void> releaseIfNotConsumed(ClientResponse response) {
return response.releaseBody().onErrorResume(ex2 -> Mono.empty());
return response.releaseBody().onErrorComplete();
}
private static <T> Mono<T> releaseIfNotConsumed(ClientResponse response, Throwable ex) {
return response.releaseBody().onErrorResume(ex2 -> Mono.empty()).then(Mono.error(ex));
return response.releaseBody().onErrorComplete().then(Mono.error(ex));
}

4
spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/AbstractMessageReaderArgumentResolver.java

@ -183,7 +183,7 @@ public abstract class AbstractMessageReaderArgumentResolver extends HandlerMetho @@ -183,7 +183,7 @@ public abstract class AbstractMessageReaderArgumentResolver extends HandlerMetho
logger.debug(exchange.getLogPrefix() + "0..N [" + elementType + "]");
}
Flux<?> flux = reader.read(actualType, elementType, request, response, readHints);
flux = flux.onErrorResume(ex -> Flux.error(handleReadError(bodyParam, ex)));
flux = flux.onErrorMap(ex -> handleReadError(bodyParam, ex));
if (isBodyRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(bodyParam)));
}
@ -199,7 +199,7 @@ public abstract class AbstractMessageReaderArgumentResolver extends HandlerMetho @@ -199,7 +199,7 @@ public abstract class AbstractMessageReaderArgumentResolver extends HandlerMetho
logger.debug(exchange.getLogPrefix() + "0..1 [" + elementType + "]");
}
Mono<?> mono = reader.readMono(actualType, elementType, request, response, readHints);
mono = mono.onErrorResume(ex -> Mono.error(handleReadError(bodyParam, ex)));
mono = mono.onErrorMap(ex -> handleReadError(bodyParam, ex));
if (isBodyRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(bodyParam)));
}

2
spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java

@ -223,7 +223,7 @@ public class FreeMarkerView extends AbstractUrlBasedView { @@ -223,7 +223,7 @@ public class FreeMarkerView extends AbstractUrlBasedView {
return lookupTemplate(locale)
.map(template -> Boolean.TRUE)
.switchIfEmpty(Mono.just(Boolean.FALSE))
.onErrorResume(FileNotFoundException.class, t -> Mono.just(Boolean.FALSE))
.onErrorReturn(FileNotFoundException.class, Boolean.FALSE)
.onErrorMap(ParseException.class, ex -> new ApplicationContextException(
"Failed to parse FreeMarker template for URL [" + getUrl() + "]", ex))
.onErrorMap(IOException.class, ex -> new ApplicationContextException(

Loading…
Cancel
Save