From 1404dd768fd02391233d69519459df32581347fc Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 25 Sep 2020 13:57:30 +0100 Subject: [PATCH] Add exchangeToMono and exchangeToFlux + deprecate exchange() See gh-25751 --- .../function/client/ClientResponse.java | 24 ---- .../function/client/DefaultWebClient.java | 110 +++++++++++------- .../reactive/function/client/WebClient.java | 97 +++++++++++++-- .../function/client/WebClientExtensions.kt | 3 +- .../function/MultipartIntegrationTests.java | 16 +-- .../client/DefaultWebClientTests.java | 39 ++++--- .../WebClientDataBufferAllocatingTests.java | 7 +- .../client/WebClientIntegrationTests.java | 45 +++---- ...LocaleContextResolverIntegrationTests.java | 14 +-- .../annotation/MultipartIntegrationTests.java | 11 +- .../annotation/ProtobufIntegrationTests.java | 52 +++++---- ...LocaleContextResolverIntegrationTests.java | 15 +-- 12 files changed, 260 insertions(+), 173 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java index 1cb8790d37..897a1a28b5 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java @@ -45,30 +45,6 @@ import org.springframework.web.reactive.function.BodyExtractor; * {@link ExchangeFunction}. Provides access to the response status and * headers, and also methods to consume the response body. * - *

NOTE: When using a {@link ClientResponse} - * through the {@code WebClient} - * {@link WebClient.RequestHeadersSpec#exchange() exchange()} method, - * you have to make sure that the body is consumed or released by using - * one of the following methods: - *

- * You can also use {@code bodyToMono(Void.class)} if no response content is - * expected. However keep in mind the connection will be closed, instead of - * being placed back in the pool, if any content does arrive. This is in - * contrast to {@link #releaseBody()} which does consume the full body and - * releases any content received. - * * @author Brian Clozel * @author Arjen Poutsma * @since 5.0 diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index b7f7e4fb0f..b8c849c29a 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -147,6 +147,14 @@ class DefaultWebClient implements WebClient { return new DefaultWebClientBuilder(this.builder); } + private static Mono releaseIfNotConsumed(ClientResponse response) { + return response.releaseBody().onErrorResume(ex2 -> Mono.empty()); + } + + private static Mono releaseIfNotConsumed(ClientResponse response, Throwable ex) { + return response.releaseBody().onErrorResume(ex2 -> Mono.empty()).then(Mono.error(ex)); + } + private class DefaultRequestBodyUriSpec implements RequestBodyUriSpec { @@ -342,6 +350,65 @@ class DefaultWebClient implements WebClient { } @Override + public ResponseSpec retrieve() { + return new DefaultResponseSpec(exchange(), this::createRequest); + } + + private HttpRequest createRequest() { + return new HttpRequest() { + private final URI uri = initUri(); + private final HttpHeaders headers = initHeaders(); + + @Override + public HttpMethod getMethod() { + return httpMethod; + } + @Override + public String getMethodValue() { + return httpMethod.name(); + } + @Override + public URI getURI() { + return this.uri; + } + @Override + public HttpHeaders getHeaders() { + return this.headers; + } + }; + } + + @Override + public Mono exchangeToMono(Function> responseHandler) { + return exchange().flatMap(response -> { + try { + return responseHandler.apply(response) + .flatMap(value -> releaseIfNotConsumed(response).thenReturn(value)) + .switchIfEmpty(Mono.defer(() -> releaseIfNotConsumed(response).then(Mono.empty()))) + .onErrorResume(ex -> releaseIfNotConsumed(response, ex)); + } + catch (Throwable ex) { + return releaseIfNotConsumed(response, ex); + } + }); + } + + @Override + public Flux exchangeToFlux(Function> responseHandler) { + return exchange().flatMapMany(response -> { + try { + return responseHandler.apply(response) + .concatWith(Flux.defer(() -> releaseIfNotConsumed(response).then(Mono.empty()))) + .onErrorResume(ex -> releaseIfNotConsumed(response, ex)); + } + catch (Throwable ex) { + return releaseIfNotConsumed(response, ex); + } + }); + } + + @Override + @SuppressWarnings("deprecation") public Mono exchange() { ClientRequest request = (this.inserter != null ? initRequestBuilder().body(this.inserter).build() : @@ -398,35 +465,6 @@ class DefaultWebClient implements WebClient { return result; } } - - @Override - public ResponseSpec retrieve() { - return new DefaultResponseSpec(exchange(), this::createRequest); - } - - private HttpRequest createRequest() { - return new HttpRequest() { - private final URI uri = initUri(); - private final HttpHeaders headers = initHeaders(); - - @Override - public HttpMethod getMethod() { - return httpMethod; - } - @Override - public String getMethodValue() { - return httpMethod.name(); - } - @Override - public URI getURI() { - return this.uri; - } - @Override - public HttpHeaders getHeaders() { - return this.headers; - } - }; - } } @@ -530,11 +568,11 @@ class DefaultWebClient implements WebClient { Mono exMono; try { exMono = handler.apply(response); - exMono = exMono.flatMap(ex -> drainBody(response, ex)); - exMono = exMono.onErrorResume(ex -> drainBody(response, ex)); + exMono = exMono.flatMap(ex -> releaseIfNotConsumed(response, ex)); + exMono = exMono.onErrorResume(ex -> releaseIfNotConsumed(response, ex)); } catch (Throwable ex2) { - exMono = drainBody(response, ex2); + exMono = releaseIfNotConsumed(response, ex2); } Mono result = exMono.flatMap(Mono::error); HttpRequest request = this.requestSupplier.get(); @@ -544,14 +582,6 @@ class DefaultWebClient implements WebClient { return null; } - @SuppressWarnings("unchecked") - private Mono drainBody(ClientResponse response, Throwable ex) { - // Ensure the body is drained, even if the StatusHandler didn't consume it, - // but ignore exception, in case the handler did consume. - return (Mono) response.releaseBody() - .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex); - } - private Mono insertCheckpoint(Mono result, int statusCode, HttpRequest request) { String httpMethod = request.getMethodValue(); URI uri = request.getURI(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java index c7843a2d99..adc044752f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java @@ -57,7 +57,8 @@ import org.springframework.web.util.UriBuilderFactory; *

For examples with a response body see: *

    *
  • {@link RequestHeadersSpec#retrieve() retrieve()} - *
  • {@link RequestHeadersSpec#exchange() exchange()} + *
  • {@link RequestHeadersSpec#exchangeToMono(Function) exchangeToMono()} + *
  • {@link RequestHeadersSpec#exchangeToFlux(Function) exchangeToFlux()} *
*

For examples with a request body see: *

    @@ -252,8 +253,7 @@ public interface WebClient { Builder defaultCookies(Consumer> cookiesConsumer); /** - * Provide a consumer to modify every request being built just before the - * call to {@link RequestHeadersSpec#exchange() exchange()}. + * Provide a consumer to customize every request being built. * @param defaultRequest the consumer to use for modifying requests * @since 5.1 */ @@ -483,21 +483,93 @@ public interface WebClient { S httpRequest(Consumer requestConsumer); /** - * Perform the HTTP request and retrieve the response body: + * Proceed to declare how to extract the response. For example to extract + * a {@link ResponseEntity} with status, headers, and body: *

    -		 * Mono<Person> bodyMono = client.get()
    +		 * Mono<ResponseEntity<Person>> entityMono = client.get()
    +		 *     .uri("/persons/1")
    +		 *     .accept(MediaType.APPLICATION_JSON)
    +		 *     .retrieve()
    +		 *     .toEntity(Person.class);
    +		 * 
    + *

    Or if interested only in the body: + *

    +		 * Mono<Person> entityMono = client.get()
     		 *     .uri("/persons/1")
     		 *     .accept(MediaType.APPLICATION_JSON)
     		 *     .retrieve()
     		 *     .bodyToMono(Person.class);
     		 * 
    - *

    This method is a shortcut to using {@link #exchange()} and - * decoding the response body through {@link ClientResponse}. - * @return {@code ResponseSpec} to specify how to decode the body - * @see #exchange() + *

    By default, 4xx and 5xx responses result in a + * {@link WebClientResponseException}. To customize error handling, use + * {@link ResponseSpec#onStatus(Predicate, Function) onStatus} handlers. */ ResponseSpec retrieve(); + /** + * An alternative to {@link #retrieve()} that provides more control via + * access to the {@link ClientResponse}. This can be useful for advanced + * scenarios, for example to decode the response differently depending + * on the response status: + *

    +		 * Mono<Object> entityMono = client.get()
    +		 *     .uri("/persons/1")
    +		 *     .accept(MediaType.APPLICATION_JSON)
    +		 *     .exchangeToMono(response -> {
    +		 *         if (response.statusCode().equals(HttpStatus.OK)) {
    +		 *             return response.bodyToMono(Person.class);
    +		 *         }
    +		 *         else if (response.statusCode().is4xxClientError()) {
    +		 *             return response.bodyToMono(ErrorContainer.class);
    +		 *         }
    +		 *         else {
    +		 *             return Mono.error(response.createException());
    +		 *         }
    +		 *     });
    +		 * 
    + *

    Note: After the returned {@code Mono} completes, + * the response body is automatically released if it hasn't been consumed. + * If the response content is needed, the provided function must declare + * how to decode it. + * @param responseHandler the function to handle the response with + * @param the type of Object the response will be transformed to + * @return a {@code Mono} produced from the response + * @since 5.3 + */ + Mono exchangeToMono(Function> responseHandler); + + /** + * An alternative to {@link #retrieve()} that provides more control via + * access to the {@link ClientResponse}. This can be useful for advanced + * scenarios, for example to decode the response differently depending + * on the response status: + *

    +		 * Mono<Object> entityMono = client.get()
    +		 *     .uri("/persons")
    +		 *     .accept(MediaType.APPLICATION_JSON)
    +		 *     .exchangeToFlux(response -> {
    +		 *         if (response.statusCode().equals(HttpStatus.OK)) {
    +		 *             return response.bodyToFlux(Person.class);
    +		 *         }
    +		 *         else if (response.statusCode().is4xxClientError()) {
    +		 *             return response.bodyToMono(ErrorContainer.class).flux();
    +		 *         }
    +		 *         else {
    +		 *             return Flux.error(response.createException());
    +		 *         }
    +		 *     });
    +		 * 
    + *

    Note: After the returned {@code Flux} completes, + * the response body is automatically released if it hasn't been consumed. + * If the response content is needed, the provided function must declare + * how to decode it. + * @param responseHandler the function to handle the response with + * @param the type of Objects the response will be transformed to + * @return a {@code Flux} of Objects produced from the response + * @since 5.3 + */ + Flux exchangeToFlux(Function> responseHandler); + /** * Perform the HTTP request and return a {@link ClientResponse} with the * response status and headers. You can then use methods of the response @@ -526,7 +598,14 @@ public interface WebClient { * if to consume the response. * @return a {@code Mono} for the response * @see #retrieve() + * @deprecated since 5.3 due to the possibility to leak memory and/or + * connections; please, use {@link #exchangeToMono(Function)}, + * {@link #exchangeToFlux(Function)}; consider also using + * {@link #retrieve()} which provides access to the response status + * and headers via {@link ResponseEntity} along with error status + * handling. */ + @Deprecated Mono exchange(); } diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt index 486cfb0beb..ec1d10a740 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt @@ -17,8 +17,8 @@ package org.springframework.web.reactive.function.client import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.awaitSingle import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec @@ -69,6 +69,7 @@ inline fun RequestBodySpec.body(producer: Any): RequestHeaders * @author Sebastien Deleuze * @since 5.2 */ +@Suppress("DEPRECATION") suspend fun RequestHeadersSpec>.awaitExchange(): ClientResponse = exchange().awaitSingle() diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java index 0441d3fe7c..a1f28b1b63 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java @@ -29,13 +29,13 @@ import reactor.test.StepVerifier; import org.springframework.core.io.ClassPathResource; import org.springframework.http.HttpEntity; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.FormFieldPart; import org.springframework.http.codec.multipart.Part; import org.springframework.util.FileCopyUtils; import org.springframework.util.MultiValueMap; -import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.server.AbstractRouterFunctionIntegrationTests; import org.springframework.web.reactive.function.server.RouterFunction; @@ -62,15 +62,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests { void multipartData(HttpServer httpServer) throws Exception { startServer(httpServer); - Mono result = webClient + Mono> result = webClient .post() .uri("http://localhost:" + this.port + "/multipartData") .bodyValue(generateBody()) - .exchange(); + .retrieve() + .toEntity(Void.class); StepVerifier .create(result) - .consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK)) + .consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK)) .verifyComplete(); } @@ -78,15 +79,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests { void parts(HttpServer httpServer) throws Exception { startServer(httpServer); - Mono result = webClient + Mono> result = webClient .post() .uri("http://localhost:" + this.port + "/parts") .bodyValue(generateBody()) - .exchange(); + .retrieve() + .toEntity(Void.class); StepVerifier .create(result) - .consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK)) + .consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK)) .verifyComplete(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java index a40f386d41..e5f2df9e25 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; /** * Unit tests for {@link DefaultWebClient}. @@ -69,6 +70,7 @@ public class DefaultWebClientTests { @BeforeEach public void setup() { ClientResponse mockResponse = mock(ClientResponse.class); + when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty()); given(this.exchangeFunction.exchange(this.captor.capture())).willReturn(Mono.just(mockResponse)); this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction); } @@ -77,7 +79,7 @@ public class DefaultWebClientTests { @Test public void basic() { this.builder.build().get().uri("/path") - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.url().toString()).isEqualTo("/base/path"); @@ -89,7 +91,7 @@ public class DefaultWebClientTests { public void uriBuilder() { this.builder.build().get() .uri(builder -> builder.path("/path").queryParam("q", "12").build()) - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.url().toString()).isEqualTo("/base/path?q=12"); @@ -98,8 +100,8 @@ public class DefaultWebClientTests { @Test // gh-22705 public void uriBuilderWithUriTemplate() { this.builder.build().get() - .uri("/path/{id}", builder -> builder.queryParam("q", "12").build("identifier")) - .exchange().block(Duration.ofSeconds(10)); + .uri("/path/{id}", builder -> builder.queryParam("q", "12").build("identifier")) + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.url().toString()).isEqualTo("/base/path/identifier?q=12"); @@ -110,7 +112,7 @@ public class DefaultWebClientTests { public void uriBuilderWithPathOverride() { this.builder.build().get() .uri(builder -> builder.replacePath("/path").build()) - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.url().toString()).isEqualTo("/path"); @@ -120,7 +122,7 @@ public class DefaultWebClientTests { public void requestHeaderAndCookie() { this.builder.build().get().uri("/path").accept(MediaType.APPLICATION_JSON) .cookies(cookies -> cookies.add("id", "123")) // SPR-16178 - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json"); @@ -131,7 +133,7 @@ public class DefaultWebClientTests { public void httpRequest() { this.builder.build().get().uri("/path") .httpRequest(httpRequest -> {}) - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.httpRequest()).isNotNull(); @@ -143,7 +145,8 @@ public class DefaultWebClientTests { .defaultHeader("Accept", "application/json").defaultCookie("id", "123") .build(); - client.get().uri("/path").exchange().block(Duration.ofSeconds(10)); + client.get().uri("/path") + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json"); @@ -160,7 +163,7 @@ public class DefaultWebClientTests { client.get().uri("/path") .header("Accept", "application/xml") .cookie("id", "456") - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.headers().getFirst("Accept")).isEqualTo("application/xml"); @@ -185,7 +188,7 @@ public class DefaultWebClientTests { try { context.set("bar"); client.get().uri("/path").attribute("foo", "bar") - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); } finally { context.remove(); @@ -271,7 +274,7 @@ public class DefaultWebClientTests { this.builder.filter(filter).build() .get().uri("/path") .attribute("foo", "bar") - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); assertThat(actual.get("foo")).isEqualTo("bar"); @@ -290,7 +293,7 @@ public class DefaultWebClientTests { this.builder.filter(filter).build() .get().uri("/path") .attribute("foo", null) - .exchange().block(Duration.ofSeconds(10)); + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); assertThat(actual.get("foo")).isNull(); @@ -306,7 +309,7 @@ public class DefaultWebClientTests { .defaultCookie("id", "123")) .build(); - client.get().uri("/path").exchange().block(Duration.ofSeconds(10)); + client.get().uri("/path").retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json"); @@ -317,8 +320,8 @@ public class DefaultWebClientTests { public void switchToErrorOnEmptyClientResponseMono() { ExchangeFunction exchangeFunction = mock(ExchangeFunction.class); given(exchangeFunction.exchange(any())).willReturn(Mono.empty()); - WebClient.Builder builder = WebClient.builder().baseUrl("/base").exchangeFunction(exchangeFunction); - StepVerifier.create(builder.build().get().uri("/path").exchange()) + WebClient client = WebClient.builder().baseUrl("/base").exchangeFunction(exchangeFunction).build(); + StepVerifier.create(client.get().uri("/path").retrieve().bodyToMono(Void.class)) .expectErrorMessage("The underlying HTTP client completed without emitting a response.") .verify(Duration.ofSeconds(5)); } @@ -333,9 +336,11 @@ public class DefaultWebClientTests { .build()) ) .build(); - Mono exchange = client.get().uri("/path").exchange(); + + Mono result = client.get().uri("/path").retrieve().bodyToMono(Void.class); + verifyNoInteractions(this.exchangeFunction); - exchange.block(Duration.ofSeconds(10)); + result.block(Duration.ofSeconds(10)); ClientRequest request = verifyAndGetRequest(); assertThat(request.headers().getFirst("Custom")).isEqualTo("value"); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java index 4b5ec631b9..33202d48f4 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java @@ -182,9 +182,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes .setBody("foo bar")); Mono result = this.webClient.get() - .exchange() - .flatMap(ClientResponse::releaseBody); - + .exchangeToMono(ClientResponse::releaseBody); StepVerifier.create(result) .expectComplete() @@ -201,8 +199,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes .setBody("foo bar")); Mono> result = this.webClient.get() - .exchange() - .flatMap(ClientResponse::toBodilessEntity); + .exchangeToMono(ClientResponse::toBodilessEntity); StepVerifier.create(result) .assertNext(entity -> { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 50a9b9f65b..5c61261fb2 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -837,8 +837,7 @@ class WebClientIntegrationTests { Mono result = this.webClient.get() .uri("/greeting") .header("X-Test-Header", "testvalue") - .exchange() - .flatMap(response -> response.bodyToMono(String.class)); + .retrieve().bodyToMono(String.class); StepVerifier.create(result) .expectNext("Hello Spring!") @@ -862,8 +861,7 @@ class WebClientIntegrationTests { Mono> result = this.webClient.get() .uri("/json").accept(MediaType.APPLICATION_JSON) - .exchange() - .flatMap(response -> response.toEntity(Pojo.class)); + .retrieve().toEntity(Pojo.class); StepVerifier.create(result) .consumeNextWith(entity -> { @@ -890,8 +888,7 @@ class WebClientIntegrationTests { Mono> result = this.webClient.get() .uri("/json").accept(MediaType.APPLICATION_JSON) - .exchange() - .flatMap(ClientResponse::toBodilessEntity); + .retrieve().toBodilessEntity(); StepVerifier.create(result) .consumeNextWith(entity -> { @@ -919,8 +916,7 @@ class WebClientIntegrationTests { Mono>> result = this.webClient.get() .uri("/json").accept(MediaType.APPLICATION_JSON) - .exchange() - .flatMap(response -> response.toEntityList(Pojo.class)); + .retrieve().toEntityList(Pojo.class); StepVerifier.create(result) .consumeNextWith(entity -> { @@ -948,8 +944,7 @@ class WebClientIntegrationTests { Mono> result = this.webClient.get() .uri("/noContent") - .exchange() - .flatMap(response -> response.toEntity(Void.class)); + .retrieve().toBodilessEntity(); StepVerifier.create(result) .assertNext(r -> assertThat(r.getStatusCode().is2xxSuccessful()).isTrue()) @@ -963,10 +958,11 @@ class WebClientIntegrationTests { prepareResponse(response -> response.setResponseCode(404) .setHeader("Content-Type", "text/plain").setBody("Not Found")); - Mono result = this.webClient.get().uri("/greeting").exchange(); + Mono> result = this.webClient.get().uri("/greeting") + .exchangeToMono(ClientResponse::toBodilessEntity); StepVerifier.create(result) - .consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.NOT_FOUND)) + .consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND)) .expectComplete() .verify(Duration.ofSeconds(3)); @@ -987,12 +983,12 @@ class WebClientIntegrationTests { prepareResponse(response -> response.setResponseCode(errorStatus) .setHeader("Content-Type", "text/plain").setBody(errorMessage)); - Mono result = this.webClient.get() + Mono> result = this.webClient.get() .uri("/unknownPage") - .exchange(); + .exchangeToMono(ClientResponse::toBodilessEntity); StepVerifier.create(result) - .consumeNextWith(response -> assertThat(response.rawStatusCode()).isEqualTo(555)) + .consumeNextWith(entity -> assertThat(entity.getStatusCodeValue()).isEqualTo(555)) .expectComplete() .verify(Duration.ofSeconds(3)); @@ -1008,7 +1004,8 @@ class WebClientIntegrationTests { startServer(connector); String uri = "/api/v4/groups/1"; - Mono responseMono = WebClient.builder().build().get().uri(uri).exchange(); + Mono> responseMono = WebClient.builder().build().get().uri(uri) + .retrieve().toBodilessEntity(); StepVerifier.create(responseMono) .expectErrorSatisfies(throwable -> { @@ -1103,12 +1100,9 @@ class WebClientIntegrationTests { .addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; SameSite=Lax; Secure") .setBody("test")); - Mono result = this.webClient.get() + this.webClient.get() .uri("/test") - .exchange(); - - StepVerifier.create(result) - .consumeNextWith(response -> { + .exchangeToMono(response -> { assertThat(response.cookies()).containsOnlyKeys("testkey1", "testkey2"); ResponseCookie cookie1 = response.cookies().get("testkey1").get(0); @@ -1123,9 +1117,10 @@ class WebClientIntegrationTests { assertThat(cookie2.isHttpOnly()).isTrue(); assertThat(cookie2.getSameSite()).isEqualTo("Lax"); assertThat(cookie2.getMaxAge().getSeconds()).isEqualTo(42); + + return response.releaseBody(); }) - .expectComplete() - .verify(Duration.ofSeconds(3)); + .block(Duration.ofSeconds(3)); expectRequestCount(1); } @@ -1135,9 +1130,7 @@ class WebClientIntegrationTests { startServer(connector); String url = "http://example.invalid"; - Mono result = this.webClient.get(). - uri(url) - .exchange(); + Mono result = this.webClient.get().uri(url).retrieve().bodyToMono(Void.class); StepVerifier.create(result) .expectErrorSatisfies(throwable -> { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java index efc8cf8c1e..b5a4e2bd4e 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,8 +26,8 @@ import reactor.test.StepVerifier; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.lang.Nullable; -import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.result.view.View; import org.springframework.web.reactive.result.view.ViewResolver; @@ -66,16 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRouterFunctionIntegr void fixedLocale(HttpServer httpServer) throws Exception { startServer(httpServer); - Mono result = webClient + Mono> result = webClient .get() .uri("http://localhost:" + this.port + "/") - .exchange(); + .retrieve().toBodilessEntity(); StepVerifier .create(result) - .consumeNextWith(response -> { - assertThat(response.statusCode()).isEqualTo(HttpStatus.OK); - assertThat(response.headers().asHttpHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY); + .consumeNextWith(entity -> { + assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(entity.getHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY); }) .verifyComplete(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java index 34558d112b..1b49442ce2 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; import org.springframework.http.HttpEntity; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.FormFieldPart; @@ -52,7 +53,6 @@ import org.springframework.web.bind.annotation.RequestPart; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.config.EnableWebFlux; -import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; @@ -85,15 +85,16 @@ class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests { void requestPart(HttpServer httpServer) throws Exception { startServer(httpServer); - Mono result = webClient + Mono> result = webClient .post() .uri("/requestPart") .bodyValue(generateBody()) - .exchange(); + .retrieve() + .toBodilessEntity(); StepVerifier .create(result) - .consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK)) + .consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK)) .verifyComplete(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java index e64264e837..f06955d7dc 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.web.reactive.result.method.annotation; import java.time.Duration; +import java.util.List; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,6 +27,8 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpHeaders; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.config.EnableWebFlux; @@ -66,18 +69,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { void value(HttpServer httpServer) throws Exception { startServer(httpServer); - Mono result = this.webClient.get() + Mono> result = this.webClient.get() .uri("/message") - .exchange() - .doOnNext(response -> { - assertThat(response.headers().contentType().get().getParameters().containsKey("delimited")).isFalse(); - assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto"); - assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg"); - }) - .flatMap(response -> response.bodyToMono(Msg.class)); + .retrieve() + .toEntity(Msg.class); StepVerifier.create(result) - .expectNext(TEST_MSG) + .consumeNextWith(entity -> { + HttpHeaders headers = entity.getHeaders(); + assertThat(headers.getContentType().getParameters().containsKey("delimited")).isFalse(); + assertThat(headers.getFirst("X-Protobuf-Schema")).isEqualTo("sample.proto"); + assertThat(headers.getFirst("X-Protobuf-Message")).isEqualTo("Msg"); + assertThat(entity.getBody()).isEqualTo(TEST_MSG); + }) .verifyComplete(); } @@ -85,20 +89,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { void values(HttpServer httpServer) throws Exception { startServer(httpServer); - Flux result = this.webClient.get() + Mono>> result = this.webClient.get() .uri("/messages") - .exchange() - .doOnNext(response -> { - assertThat(response.headers().contentType().get().getParameters().get("delimited")).isEqualTo("true"); - assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto"); - assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg"); - }) - .flatMapMany(response -> response.bodyToFlux(Msg.class)); + .retrieve() + .toEntityList(Msg.class); StepVerifier.create(result) - .expectNext(TEST_MSG) - .expectNext(TEST_MSG) - .expectNext(TEST_MSG) + .consumeNextWith(entity -> { + HttpHeaders headers = entity.getHeaders(); + assertThat(headers.getContentType().getParameters().get("delimited")).isEqualTo("true"); + assertThat(headers.getFirst("X-Protobuf-Schema")).isEqualTo("sample.proto"); + assertThat(headers.getFirst("X-Protobuf-Message")).isEqualTo("Msg"); + assertThat(entity.getBody()).containsExactly(TEST_MSG, TEST_MSG, TEST_MSG); + }) .verifyComplete(); } @@ -108,13 +111,12 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { Flux result = this.webClient.get() .uri("/message-stream") - .exchange() - .doOnNext(response -> { + .exchangeToFlux(response -> { assertThat(response.headers().contentType().get().getParameters().get("delimited")).isEqualTo("true"); assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto"); assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg"); - }) - .flatMapMany(response -> response.bodyToFlux(Msg.class)); + return response.bodyToFlux(Msg.class); + }); StepVerifier.create(result) .expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build()) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java index b21b68cd5b..bf0e326396 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,12 +30,12 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.lang.Nullable; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.reactive.config.ViewResolverRegistry; import org.springframework.web.reactive.config.WebFluxConfigurationSupport; -import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.result.method.annotation.AbstractRequestMappingIntegrationTests; import org.springframework.web.server.ServerWebExchange; @@ -66,15 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRequestMappingIntegr void fixedLocale(HttpServer httpServer) throws Exception { startServer(httpServer); - Mono result = webClient + Mono> result = webClient .get() .uri("http://localhost:" + this.port + "/") - .exchange(); + .retrieve() + .toBodilessEntity(); StepVerifier.create(result) - .consumeNextWith(response -> { - assertThat(response.statusCode()).isEqualTo(HttpStatus.OK); - assertThat(response.headers().asHttpHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY); + .consumeNextWith(entity -> { + assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(entity.getHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY); }) .verifyComplete(); }