Browse Source

Add exchangeToMono and exchangeToFlux + deprecate exchange()

See gh-25751
pull/25823/head
Rossen Stoyanchev 4 years ago
parent
commit
1404dd768f
  1. 24
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java
  2. 110
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  3. 97
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java
  4. 3
      spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt
  5. 16
      spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java
  6. 39
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java
  7. 7
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java
  8. 45
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java
  9. 14
      spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java
  10. 11
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java
  11. 52
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java
  12. 15
      spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java

24
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java

@ -45,30 +45,6 @@ import org.springframework.web.reactive.function.BodyExtractor; @@ -45,30 +45,6 @@ import org.springframework.web.reactive.function.BodyExtractor;
* {@link ExchangeFunction}. Provides access to the response status and
* headers, and also methods to consume the response body.
*
* <p><strong>NOTE:</strong> When using a {@link ClientResponse}
* through the {@code WebClient}
* {@link WebClient.RequestHeadersSpec#exchange() exchange()} method,
* you have to make sure that the body is consumed or released by using
* one of the following methods:
* <ul>
* <li>{@link #body(BodyExtractor)}</li>
* <li>{@link #bodyToMono(Class)} or
* {@link #bodyToMono(ParameterizedTypeReference)}</li>
* <li>{@link #bodyToFlux(Class)} or
* {@link #bodyToFlux(ParameterizedTypeReference)}</li>
* <li>{@link #toEntity(Class)} or
* {@link #toEntity(ParameterizedTypeReference)}</li>
* <li>{@link #toEntityList(Class)} or
* {@link #toEntityList(ParameterizedTypeReference)}</li>
* <li>{@link #toBodilessEntity()}</li>
* <li>{@link #releaseBody()}</li>
* </ul>
* You can also use {@code bodyToMono(Void.class)} if no response content is
* expected. However keep in mind the connection will be closed, instead of
* being placed back in the pool, if any content does arrive. This is in
* contrast to {@link #releaseBody()} which does consume the full body and
* releases any content received.
*
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0

110
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -147,6 +147,14 @@ class DefaultWebClient implements WebClient { @@ -147,6 +147,14 @@ class DefaultWebClient implements WebClient {
return new DefaultWebClientBuilder(this.builder);
}
private static Mono<Void> releaseIfNotConsumed(ClientResponse response) {
return response.releaseBody().onErrorResume(ex2 -> Mono.empty());
}
private static <T> Mono<T> releaseIfNotConsumed(ClientResponse response, Throwable ex) {
return response.releaseBody().onErrorResume(ex2 -> Mono.empty()).then(Mono.error(ex));
}
private class DefaultRequestBodyUriSpec implements RequestBodyUriSpec {
@ -342,6 +350,65 @@ class DefaultWebClient implements WebClient { @@ -342,6 +350,65 @@ class DefaultWebClient implements WebClient {
}
@Override
public ResponseSpec retrieve() {
return new DefaultResponseSpec(exchange(), this::createRequest);
}
private HttpRequest createRequest() {
return new HttpRequest() {
private final URI uri = initUri();
private final HttpHeaders headers = initHeaders();
@Override
public HttpMethod getMethod() {
return httpMethod;
}
@Override
public String getMethodValue() {
return httpMethod.name();
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
};
}
@Override
public <V> Mono<V> exchangeToMono(Function<ClientResponse, ? extends Mono<V>> responseHandler) {
return exchange().flatMap(response -> {
try {
return responseHandler.apply(response)
.flatMap(value -> releaseIfNotConsumed(response).thenReturn(value))
.switchIfEmpty(Mono.defer(() -> releaseIfNotConsumed(response).then(Mono.empty())))
.onErrorResume(ex -> releaseIfNotConsumed(response, ex));
}
catch (Throwable ex) {
return releaseIfNotConsumed(response, ex);
}
});
}
@Override
public <V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> responseHandler) {
return exchange().flatMapMany(response -> {
try {
return responseHandler.apply(response)
.concatWith(Flux.defer(() -> releaseIfNotConsumed(response).then(Mono.empty())))
.onErrorResume(ex -> releaseIfNotConsumed(response, ex));
}
catch (Throwable ex) {
return releaseIfNotConsumed(response, ex);
}
});
}
@Override
@SuppressWarnings("deprecation")
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
@ -398,35 +465,6 @@ class DefaultWebClient implements WebClient { @@ -398,35 +465,6 @@ class DefaultWebClient implements WebClient {
return result;
}
}
@Override
public ResponseSpec retrieve() {
return new DefaultResponseSpec(exchange(), this::createRequest);
}
private HttpRequest createRequest() {
return new HttpRequest() {
private final URI uri = initUri();
private final HttpHeaders headers = initHeaders();
@Override
public HttpMethod getMethod() {
return httpMethod;
}
@Override
public String getMethodValue() {
return httpMethod.name();
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
};
}
}
@ -530,11 +568,11 @@ class DefaultWebClient implements WebClient { @@ -530,11 +568,11 @@ class DefaultWebClient implements WebClient {
Mono<? extends Throwable> exMono;
try {
exMono = handler.apply(response);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
exMono = exMono.flatMap(ex -> releaseIfNotConsumed(response, ex));
exMono = exMono.onErrorResume(ex -> releaseIfNotConsumed(response, ex));
}
catch (Throwable ex2) {
exMono = drainBody(response, ex2);
exMono = releaseIfNotConsumed(response, ex2);
}
Mono<T> result = exMono.flatMap(Mono::error);
HttpRequest request = this.requestSupplier.get();
@ -544,14 +582,6 @@ class DefaultWebClient implements WebClient { @@ -544,14 +582,6 @@ class DefaultWebClient implements WebClient {
return null;
}
@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
// but ignore exception, in case the handler did consume.
return (Mono<T>) response.releaseBody()
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
private <T> Mono<T> insertCheckpoint(Mono<T> result, int statusCode, HttpRequest request) {
String httpMethod = request.getMethodValue();
URI uri = request.getURI();

97
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

@ -57,7 +57,8 @@ import org.springframework.web.util.UriBuilderFactory; @@ -57,7 +57,8 @@ import org.springframework.web.util.UriBuilderFactory;
* <p>For examples with a response body see:
* <ul>
* <li>{@link RequestHeadersSpec#retrieve() retrieve()}
* <li>{@link RequestHeadersSpec#exchange() exchange()}
* <li>{@link RequestHeadersSpec#exchangeToMono(Function) exchangeToMono()}
* <li>{@link RequestHeadersSpec#exchangeToFlux(Function) exchangeToFlux()}
* </ul>
* <p>For examples with a request body see:
* <ul>
@ -252,8 +253,7 @@ public interface WebClient { @@ -252,8 +253,7 @@ public interface WebClient {
Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
/**
* Provide a consumer to modify every request being built just before the
* call to {@link RequestHeadersSpec#exchange() exchange()}.
* Provide a consumer to customize every request being built.
* @param defaultRequest the consumer to use for modifying requests
* @since 5.1
*/
@ -483,21 +483,93 @@ public interface WebClient { @@ -483,21 +483,93 @@ public interface WebClient {
S httpRequest(Consumer<ClientHttpRequest> requestConsumer);
/**
* Perform the HTTP request and retrieve the response body:
* Proceed to declare how to extract the response. For example to extract
* a {@link ResponseEntity} with status, headers, and body:
* <p><pre>
* Mono&lt;Person&gt; bodyMono = client.get()
* Mono&lt;ResponseEntity&lt;Person&gt;&gt; entityMono = client.get()
* .uri("/persons/1")
* .accept(MediaType.APPLICATION_JSON)
* .retrieve()
* .toEntity(Person.class);
* </pre>
* <p>Or if interested only in the body:
* <p><pre>
* Mono&lt;Person&gt; entityMono = client.get()
* .uri("/persons/1")
* .accept(MediaType.APPLICATION_JSON)
* .retrieve()
* .bodyToMono(Person.class);
* </pre>
* <p>This method is a shortcut to using {@link #exchange()} and
* decoding the response body through {@link ClientResponse}.
* @return {@code ResponseSpec} to specify how to decode the body
* @see #exchange()
* <p>By default, 4xx and 5xx responses result in a
* {@link WebClientResponseException}. To customize error handling, use
* {@link ResponseSpec#onStatus(Predicate, Function) onStatus} handlers.
*/
ResponseSpec retrieve();
/**
* An alternative to {@link #retrieve()} that provides more control via
* access to the {@link ClientResponse}. This can be useful for advanced
* scenarios, for example to decode the response differently depending
* on the response status:
* <p><pre>
* Mono&lt;Object&gt; entityMono = client.get()
* .uri("/persons/1")
* .accept(MediaType.APPLICATION_JSON)
* .exchangeToMono(response -> {
* if (response.statusCode().equals(HttpStatus.OK)) {
* return response.bodyToMono(Person.class);
* }
* else if (response.statusCode().is4xxClientError()) {
* return response.bodyToMono(ErrorContainer.class);
* }
* else {
* return Mono.error(response.createException());
* }
* });
* </pre>
* <p><strong>Note:</strong> After the returned {@code Mono} completes,
* the response body is automatically released if it hasn't been consumed.
* If the response content is needed, the provided function must declare
* how to decode it.
* @param responseHandler the function to handle the response with
* @param <V> the type of Object the response will be transformed to
* @return a {@code Mono} produced from the response
* @since 5.3
*/
<V> Mono<V> exchangeToMono(Function<ClientResponse, ? extends Mono<V>> responseHandler);
/**
* An alternative to {@link #retrieve()} that provides more control via
* access to the {@link ClientResponse}. This can be useful for advanced
* scenarios, for example to decode the response differently depending
* on the response status:
* <p><pre>
* Mono&lt;Object&gt; entityMono = client.get()
* .uri("/persons")
* .accept(MediaType.APPLICATION_JSON)
* .exchangeToFlux(response -> {
* if (response.statusCode().equals(HttpStatus.OK)) {
* return response.bodyToFlux(Person.class);
* }
* else if (response.statusCode().is4xxClientError()) {
* return response.bodyToMono(ErrorContainer.class).flux();
* }
* else {
* return Flux.error(response.createException());
* }
* });
* </pre>
* <p><strong>Note:</strong> After the returned {@code Flux} completes,
* the response body is automatically released if it hasn't been consumed.
* If the response content is needed, the provided function must declare
* how to decode it.
* @param responseHandler the function to handle the response with
* @param <V> the type of Objects the response will be transformed to
* @return a {@code Flux} of Objects produced from the response
* @since 5.3
*/
<V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> responseHandler);
/**
* Perform the HTTP request and return a {@link ClientResponse} with the
* response status and headers. You can then use methods of the response
@ -526,7 +598,14 @@ public interface WebClient { @@ -526,7 +598,14 @@ public interface WebClient {
* if to consume the response.
* @return a {@code Mono} for the response
* @see #retrieve()
* @deprecated since 5.3 due to the possibility to leak memory and/or
* connections; please, use {@link #exchangeToMono(Function)},
* {@link #exchangeToFlux(Function)}; consider also using
* {@link #retrieve()} which provides access to the response status
* and headers via {@link ResponseEntity} along with error status
* handling.
*/
@Deprecated
Mono<ClientResponse> exchange();
}

3
spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

@ -17,8 +17,8 @@ @@ -17,8 +17,8 @@
package org.springframework.web.reactive.function.client
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitSingle
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec
@ -69,6 +69,7 @@ inline fun <reified T : Any> RequestBodySpec.body(producer: Any): RequestHeaders @@ -69,6 +69,7 @@ inline fun <reified T : Any> RequestBodySpec.body(producer: Any): RequestHeaders
* @author Sebastien Deleuze
* @since 5.2
*/
@Suppress("DEPRECATION")
suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): ClientResponse =
exchange().awaitSingle()

16
spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java

@ -29,13 +29,13 @@ import reactor.test.StepVerifier; @@ -29,13 +29,13 @@ import reactor.test.StepVerifier;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.AbstractRouterFunctionIntegrationTests;
import org.springframework.web.reactive.function.server.RouterFunction;
@ -62,15 +62,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests { @@ -62,15 +62,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests {
void multipartData(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.post()
.uri("http://localhost:" + this.port + "/multipartData")
.bodyValue(generateBody())
.exchange();
.retrieve()
.toEntity(Void.class);
StepVerifier
.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK))
.verifyComplete();
}
@ -78,15 +79,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests { @@ -78,15 +79,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests {
void parts(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.post()
.uri("http://localhost:" + this.port + "/parts")
.bodyValue(generateBody())
.exchange();
.retrieve()
.toEntity(Void.class);
StepVerifier
.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK))
.verifyComplete();
}

39
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java

@ -47,6 +47,7 @@ import static org.mockito.Mockito.mock; @@ -47,6 +47,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link DefaultWebClient}.
@ -69,6 +70,7 @@ public class DefaultWebClientTests { @@ -69,6 +70,7 @@ public class DefaultWebClientTests {
@BeforeEach
public void setup() {
ClientResponse mockResponse = mock(ClientResponse.class);
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
given(this.exchangeFunction.exchange(this.captor.capture())).willReturn(Mono.just(mockResponse));
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction);
}
@ -77,7 +79,7 @@ public class DefaultWebClientTests { @@ -77,7 +79,7 @@ public class DefaultWebClientTests {
@Test
public void basic() {
this.builder.build().get().uri("/path")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/base/path");
@ -89,7 +91,7 @@ public class DefaultWebClientTests { @@ -89,7 +91,7 @@ public class DefaultWebClientTests {
public void uriBuilder() {
this.builder.build().get()
.uri(builder -> builder.path("/path").queryParam("q", "12").build())
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/base/path?q=12");
@ -98,8 +100,8 @@ public class DefaultWebClientTests { @@ -98,8 +100,8 @@ public class DefaultWebClientTests {
@Test // gh-22705
public void uriBuilderWithUriTemplate() {
this.builder.build().get()
.uri("/path/{id}", builder -> builder.queryParam("q", "12").build("identifier"))
.exchange().block(Duration.ofSeconds(10));
.uri("/path/{id}", builder -> builder.queryParam("q", "12").build("identifier"))
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/base/path/identifier?q=12");
@ -110,7 +112,7 @@ public class DefaultWebClientTests { @@ -110,7 +112,7 @@ public class DefaultWebClientTests {
public void uriBuilderWithPathOverride() {
this.builder.build().get()
.uri(builder -> builder.replacePath("/path").build())
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/path");
@ -120,7 +122,7 @@ public class DefaultWebClientTests { @@ -120,7 +122,7 @@ public class DefaultWebClientTests {
public void requestHeaderAndCookie() {
this.builder.build().get().uri("/path").accept(MediaType.APPLICATION_JSON)
.cookies(cookies -> cookies.add("id", "123")) // SPR-16178
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json");
@ -131,7 +133,7 @@ public class DefaultWebClientTests { @@ -131,7 +133,7 @@ public class DefaultWebClientTests {
public void httpRequest() {
this.builder.build().get().uri("/path")
.httpRequest(httpRequest -> {})
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.httpRequest()).isNotNull();
@ -143,7 +145,8 @@ public class DefaultWebClientTests { @@ -143,7 +145,8 @@ public class DefaultWebClientTests {
.defaultHeader("Accept", "application/json").defaultCookie("id", "123")
.build();
client.get().uri("/path").exchange().block(Duration.ofSeconds(10));
client.get().uri("/path")
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json");
@ -160,7 +163,7 @@ public class DefaultWebClientTests { @@ -160,7 +163,7 @@ public class DefaultWebClientTests {
client.get().uri("/path")
.header("Accept", "application/xml")
.cookie("id", "456")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/xml");
@ -185,7 +188,7 @@ public class DefaultWebClientTests { @@ -185,7 +188,7 @@ public class DefaultWebClientTests {
try {
context.set("bar");
client.get().uri("/path").attribute("foo", "bar")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
}
finally {
context.remove();
@ -271,7 +274,7 @@ public class DefaultWebClientTests { @@ -271,7 +274,7 @@ public class DefaultWebClientTests {
this.builder.filter(filter).build()
.get().uri("/path")
.attribute("foo", "bar")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
assertThat(actual.get("foo")).isEqualTo("bar");
@ -290,7 +293,7 @@ public class DefaultWebClientTests { @@ -290,7 +293,7 @@ public class DefaultWebClientTests {
this.builder.filter(filter).build()
.get().uri("/path")
.attribute("foo", null)
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
assertThat(actual.get("foo")).isNull();
@ -306,7 +309,7 @@ public class DefaultWebClientTests { @@ -306,7 +309,7 @@ public class DefaultWebClientTests {
.defaultCookie("id", "123"))
.build();
client.get().uri("/path").exchange().block(Duration.ofSeconds(10));
client.get().uri("/path").retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json");
@ -317,8 +320,8 @@ public class DefaultWebClientTests { @@ -317,8 +320,8 @@ public class DefaultWebClientTests {
public void switchToErrorOnEmptyClientResponseMono() {
ExchangeFunction exchangeFunction = mock(ExchangeFunction.class);
given(exchangeFunction.exchange(any())).willReturn(Mono.empty());
WebClient.Builder builder = WebClient.builder().baseUrl("/base").exchangeFunction(exchangeFunction);
StepVerifier.create(builder.build().get().uri("/path").exchange())
WebClient client = WebClient.builder().baseUrl("/base").exchangeFunction(exchangeFunction).build();
StepVerifier.create(client.get().uri("/path").retrieve().bodyToMono(Void.class))
.expectErrorMessage("The underlying HTTP client completed without emitting a response.")
.verify(Duration.ofSeconds(5));
}
@ -333,9 +336,11 @@ public class DefaultWebClientTests { @@ -333,9 +336,11 @@ public class DefaultWebClientTests {
.build())
)
.build();
Mono<ClientResponse> exchange = client.get().uri("/path").exchange();
Mono<Void> result = client.get().uri("/path").retrieve().bodyToMono(Void.class);
verifyNoInteractions(this.exchangeFunction);
exchange.block(Duration.ofSeconds(10));
result.block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Custom")).isEqualTo("value");
}

7
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java

@ -182,9 +182,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes @@ -182,9 +182,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes
.setBody("foo bar"));
Mono<Void> result = this.webClient.get()
.exchange()
.flatMap(ClientResponse::releaseBody);
.exchangeToMono(ClientResponse::releaseBody);
StepVerifier.create(result)
.expectComplete()
@ -201,8 +199,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes @@ -201,8 +199,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes
.setBody("foo bar"));
Mono<ResponseEntity<Void>> result = this.webClient.get()
.exchange()
.flatMap(ClientResponse::toBodilessEntity);
.exchangeToMono(ClientResponse::toBodilessEntity);
StepVerifier.create(result)
.assertNext(entity -> {

45
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

@ -837,8 +837,7 @@ class WebClientIntegrationTests { @@ -837,8 +837,7 @@ class WebClientIntegrationTests {
Mono<String> result = this.webClient.get()
.uri("/greeting")
.header("X-Test-Header", "testvalue")
.exchange()
.flatMap(response -> response.bodyToMono(String.class));
.retrieve().bodyToMono(String.class);
StepVerifier.create(result)
.expectNext("Hello Spring!")
@ -862,8 +861,7 @@ class WebClientIntegrationTests { @@ -862,8 +861,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<Pojo>> result = this.webClient.get()
.uri("/json").accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.toEntity(Pojo.class));
.retrieve().toEntity(Pojo.class);
StepVerifier.create(result)
.consumeNextWith(entity -> {
@ -890,8 +888,7 @@ class WebClientIntegrationTests { @@ -890,8 +888,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/json").accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(ClientResponse::toBodilessEntity);
.retrieve().toBodilessEntity();
StepVerifier.create(result)
.consumeNextWith(entity -> {
@ -919,8 +916,7 @@ class WebClientIntegrationTests { @@ -919,8 +916,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<List<Pojo>>> result = this.webClient.get()
.uri("/json").accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.toEntityList(Pojo.class));
.retrieve().toEntityList(Pojo.class);
StepVerifier.create(result)
.consumeNextWith(entity -> {
@ -948,8 +944,7 @@ class WebClientIntegrationTests { @@ -948,8 +944,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/noContent")
.exchange()
.flatMap(response -> response.toEntity(Void.class));
.retrieve().toBodilessEntity();
StepVerifier.create(result)
.assertNext(r -> assertThat(r.getStatusCode().is2xxSuccessful()).isTrue())
@ -963,10 +958,11 @@ class WebClientIntegrationTests { @@ -963,10 +958,11 @@ class WebClientIntegrationTests {
prepareResponse(response -> response.setResponseCode(404)
.setHeader("Content-Type", "text/plain").setBody("Not Found"));
Mono<ClientResponse> result = this.webClient.get().uri("/greeting").exchange();
Mono<ResponseEntity<Void>> result = this.webClient.get().uri("/greeting")
.exchangeToMono(ClientResponse::toBodilessEntity);
StepVerifier.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.NOT_FOUND))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND))
.expectComplete()
.verify(Duration.ofSeconds(3));
@ -987,12 +983,12 @@ class WebClientIntegrationTests { @@ -987,12 +983,12 @@ class WebClientIntegrationTests {
prepareResponse(response -> response.setResponseCode(errorStatus)
.setHeader("Content-Type", "text/plain").setBody(errorMessage));
Mono<ClientResponse> result = this.webClient.get()
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/unknownPage")
.exchange();
.exchangeToMono(ClientResponse::toBodilessEntity);
StepVerifier.create(result)
.consumeNextWith(response -> assertThat(response.rawStatusCode()).isEqualTo(555))
.consumeNextWith(entity -> assertThat(entity.getStatusCodeValue()).isEqualTo(555))
.expectComplete()
.verify(Duration.ofSeconds(3));
@ -1008,7 +1004,8 @@ class WebClientIntegrationTests { @@ -1008,7 +1004,8 @@ class WebClientIntegrationTests {
startServer(connector);
String uri = "/api/v4/groups/1";
Mono<ClientResponse> responseMono = WebClient.builder().build().get().uri(uri).exchange();
Mono<ResponseEntity<Void>> responseMono = WebClient.builder().build().get().uri(uri)
.retrieve().toBodilessEntity();
StepVerifier.create(responseMono)
.expectErrorSatisfies(throwable -> {
@ -1103,12 +1100,9 @@ class WebClientIntegrationTests { @@ -1103,12 +1100,9 @@ class WebClientIntegrationTests {
.addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; SameSite=Lax; Secure")
.setBody("test"));
Mono<ClientResponse> result = this.webClient.get()
this.webClient.get()
.uri("/test")
.exchange();
StepVerifier.create(result)
.consumeNextWith(response -> {
.exchangeToMono(response -> {
assertThat(response.cookies()).containsOnlyKeys("testkey1", "testkey2");
ResponseCookie cookie1 = response.cookies().get("testkey1").get(0);
@ -1123,9 +1117,10 @@ class WebClientIntegrationTests { @@ -1123,9 +1117,10 @@ class WebClientIntegrationTests {
assertThat(cookie2.isHttpOnly()).isTrue();
assertThat(cookie2.getSameSite()).isEqualTo("Lax");
assertThat(cookie2.getMaxAge().getSeconds()).isEqualTo(42);
return response.releaseBody();
})
.expectComplete()
.verify(Duration.ofSeconds(3));
.block(Duration.ofSeconds(3));
expectRequestCount(1);
}
@ -1135,9 +1130,7 @@ class WebClientIntegrationTests { @@ -1135,9 +1130,7 @@ class WebClientIntegrationTests {
startServer(connector);
String url = "http://example.invalid";
Mono<ClientResponse> result = this.webClient.get().
uri(url)
.exchange();
Mono<Void> result = this.webClient.get().uri(url).retrieve().bodyToMono(Void.class);
StepVerifier.create(result)
.expectErrorSatisfies(throwable -> {

14
spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,8 +26,8 @@ import reactor.test.StepVerifier; @@ -26,8 +26,8 @@ import reactor.test.StepVerifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.result.view.View;
import org.springframework.web.reactive.result.view.ViewResolver;
@ -66,16 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRouterFunctionIntegr @@ -66,16 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRouterFunctionIntegr
void fixedLocale(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.get()
.uri("http://localhost:" + this.port + "/")
.exchange();
.retrieve().toBodilessEntity();
StepVerifier
.create(result)
.consumeNextWith(response -> {
assertThat(response.statusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.headers().asHttpHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
.consumeNextWith(entity -> {
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(entity.getHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
})
.verifyComplete();
}

11
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,6 +37,7 @@ import org.springframework.core.io.ClassPathResource; @@ -37,6 +37,7 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
@ -52,7 +53,6 @@ import org.springframework.web.bind.annotation.RequestPart; @@ -52,7 +53,6 @@ import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
@ -85,15 +85,16 @@ class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -85,15 +85,16 @@ class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests {
void requestPart(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.post()
.uri("/requestPart")
.bodyValue(generateBody())
.exchange();
.retrieve()
.toBodilessEntity();
StepVerifier
.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK))
.verifyComplete();
}

52
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.web.reactive.result.method.annotation;
import java.time.Duration;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -26,6 +27,8 @@ import org.springframework.context.ApplicationContext; @@ -26,6 +27,8 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.config.EnableWebFlux;
@ -66,18 +69,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { @@ -66,18 +69,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests {
void value(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<Msg> result = this.webClient.get()
Mono<ResponseEntity<Msg>> result = this.webClient.get()
.uri("/message")
.exchange()
.doOnNext(response -> {
assertThat(response.headers().contentType().get().getParameters().containsKey("delimited")).isFalse();
assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto");
assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg");
})
.flatMap(response -> response.bodyToMono(Msg.class));
.retrieve()
.toEntity(Msg.class);
StepVerifier.create(result)
.expectNext(TEST_MSG)
.consumeNextWith(entity -> {
HttpHeaders headers = entity.getHeaders();
assertThat(headers.getContentType().getParameters().containsKey("delimited")).isFalse();
assertThat(headers.getFirst("X-Protobuf-Schema")).isEqualTo("sample.proto");
assertThat(headers.getFirst("X-Protobuf-Message")).isEqualTo("Msg");
assertThat(entity.getBody()).isEqualTo(TEST_MSG);
})
.verifyComplete();
}
@ -85,20 +89,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { @@ -85,20 +89,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests {
void values(HttpServer httpServer) throws Exception {
startServer(httpServer);
Flux<Msg> result = this.webClient.get()
Mono<ResponseEntity<List<Msg>>> result = this.webClient.get()
.uri("/messages")
.exchange()
.doOnNext(response -> {
assertThat(response.headers().contentType().get().getParameters().get("delimited")).isEqualTo("true");
assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto");
assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg");
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
.retrieve()
.toEntityList(Msg.class);
StepVerifier.create(result)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.consumeNextWith(entity -> {
HttpHeaders headers = entity.getHeaders();
assertThat(headers.getContentType().getParameters().get("delimited")).isEqualTo("true");
assertThat(headers.getFirst("X-Protobuf-Schema")).isEqualTo("sample.proto");
assertThat(headers.getFirst("X-Protobuf-Message")).isEqualTo("Msg");
assertThat(entity.getBody()).containsExactly(TEST_MSG, TEST_MSG, TEST_MSG);
})
.verifyComplete();
}
@ -108,13 +111,12 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { @@ -108,13 +111,12 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests {
Flux<Msg> result = this.webClient.get()
.uri("/message-stream")
.exchange()
.doOnNext(response -> {
.exchangeToFlux(response -> {
assertThat(response.headers().contentType().get().getParameters().get("delimited")).isEqualTo("true");
assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto");
assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg");
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
return response.bodyToFlux(Msg.class);
});
StepVerifier.create(result)
.expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build())

15
spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,12 +30,12 @@ import org.springframework.context.annotation.ComponentScan; @@ -30,12 +30,12 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.reactive.config.ViewResolverRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurationSupport;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.result.method.annotation.AbstractRequestMappingIntegrationTests;
import org.springframework.web.server.ServerWebExchange;
@ -66,15 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRequestMappingIntegr @@ -66,15 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRequestMappingIntegr
void fixedLocale(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.get()
.uri("http://localhost:" + this.port + "/")
.exchange();
.retrieve()
.toBodilessEntity();
StepVerifier.create(result)
.consumeNextWith(response -> {
assertThat(response.statusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.headers().asHttpHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
.consumeNextWith(entity -> {
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(entity.getHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
})
.verifyComplete();
}

Loading…
Cancel
Save