From 9bd989f1bbf596c44b818b8979f5132b8cda0d1b Mon Sep 17 00:00:00 2001 From: Ashley Scopes <73482956+ascopes@users.noreply.github.com> Date: Wed, 11 Aug 2021 16:13:41 +0100 Subject: [PATCH 1/2] WebClient tests for socket and response format issues Added test case for malformed response chunk, which is now failing as expected. See gh-27262 --- .../client/WebClientIntegrationTests.java | 145 +++++++++++++++++- 1 file changed, 142 insertions(+), 3 deletions(-) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index e8a082a6b6..4878ca78c2 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -18,11 +18,15 @@ package org.springframework.web.reactive.function.client; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.net.ServerSocket; +import java.net.Socket; import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -31,16 +35,21 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import okhttp3.mockwebserver.SocketPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.util.SocketUtils; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -83,7 +92,7 @@ class WebClientIntegrationTests { @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) - @ParameterizedTest(name = "[{index}] webClient [{0}]") + @ParameterizedTest(name = "[{index}] {displayName} [{0}]") @MethodSource("arguments") @interface ParameterizedWebClientTest { } @@ -113,7 +122,9 @@ class WebClientIntegrationTests { @AfterEach void shutdown() throws IOException { - this.server.shutdown(); + if (server != null) { + this.server.shutdown(); + } } @@ -1209,6 +1220,135 @@ class WebClientIntegrationTests { .verify(); } + static Stream socketFaultArguments() { + Stream.Builder argumentsBuilder = Stream.builder(); + arguments().forEach(arg -> { + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AT_START)); + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AFTER_REQUEST)); + }); + return argumentsBuilder.build(); + } + + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") + @MethodSource("socketFaultArguments") + void prematureClosureFault(ClientHttpConnector connector, SocketPolicy socketPolicy) { + startServer(connector); + + prepareResponse(response -> response + .setSocketPolicy(socketPolicy) + .setStatus("HTTP/1.1 200 OK") + .setHeader("Response-Header-1", "value 1") + .setHeader("Response-Header-2", "value 2") + .setBody("{\"message\": \"Hello, World!\"}")); + + String uri = "/test"; + Mono result = this.webClient + .post() + .uri(uri) + // Random non-empty body to allow us to interrupt. + .bodyValue("{\"action\": \"Say hello!\"}") + .retrieve() + .bodyToMono(String.class); + + StepVerifier.create(result) + .expectErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(WebClientRequestException.class); + WebClientRequestException ex = (WebClientRequestException) throwable; + // Varies between connector providers. + assertThat(ex.getCause()).isInstanceOf(IOException.class); + }) + .verify(); + } + + static Stream malformedResponseChunkArguments() { + return Stream.of( + Arguments.of(new ReactorClientHttpConnector(), true), + Arguments.of(new JettyClientHttpConnector(), true), + // Apache injects the Transfer-Encoding header for us, and complains with an exception if we also + // add it. The other two connectors do not add the header at all. We need this header for the test + // case to work correctly. + Arguments.of(new HttpComponentsClientHttpConnector(), false) + ); + } + + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") + @MethodSource("malformedResponseChunkArguments") + void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector, boolean addTransferEncodingHeader) { + Mono result = doMalformedResponseChunks(connector, addTransferEncodingHeader, ResponseSpec::toBodilessEntity); + + StepVerifier.create(result) + .expectErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(WebClientException.class); + WebClientException ex = (WebClientException) throwable; + assertThat(ex.getCause()).isInstanceOf(IOException.class); + }) + .verify(); + } + + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") + @MethodSource("malformedResponseChunkArguments") + void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector, boolean addTransferEncodingHeader) { + Mono result = doMalformedResponseChunks(connector, addTransferEncodingHeader, spec -> spec.toEntity(String.class)); + + StepVerifier.create(result) + .expectErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(WebClientException.class); + WebClientException ex = (WebClientException) throwable; + assertThat(ex.getCause()).isInstanceOf(IOException.class); + }) + .verify(); + } + + private Mono doMalformedResponseChunks( + ClientHttpConnector connector, + boolean addTransferEncodingHeader, + Function> responseHandler + ) { + int port = SocketUtils.findAvailableTcpPort(); + + Thread serverThread = new Thread(() -> { + // This exists separately to the main mock server, as I had a really hard time getting that to send the + // chunked responses correctly, flushing the socket each time. This was the only way I was able to replicate + // the issue of the client not handling malformed response chunks correctly. + try (ServerSocket serverSocket = new ServerSocket(port)) { + Socket socket = serverSocket.accept(); + InputStream is = socket.getInputStream(); + + //noinspection ResultOfMethodCallIgnored + is.read(new byte[4096]); + + OutputStream os = socket.getOutputStream(); + os.write("HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8)); + os.write("Transfer-Encoding: chunked\r\n".getBytes(StandardCharsets.UTF_8)); + os.write("\r\n".getBytes(StandardCharsets.UTF_8)); + os.write("lskdu018973t09sylgasjkfg1][]'./.sdlv".getBytes(StandardCharsets.UTF_8)); + socket.close(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + + serverThread.setDaemon(true); + serverThread.start(); + + ResponseSpec spec = WebClient + .builder() + .clientConnector(connector) + .baseUrl("http://localhost:" + port) + .build() + .post() + .headers(headers -> { + if (addTransferEncodingHeader) { + headers.add(HttpHeaders.TRANSFER_ENCODING, "chunked"); + } + }) + .retrieve(); + + return responseHandler + .apply(spec) + .doFinally(signal -> serverThread.stop()); + } private void prepareResponse(Consumer consumer) { MockResponse response = new MockResponse(); @@ -1252,5 +1392,4 @@ class WebClientIntegrationTests { this.containerValue = containerValue; } } - } From b6111d04a5453a1e75840c53ad6f38eb7314138a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 7 Oct 2021 16:50:30 +0100 Subject: [PATCH 2/2] Ensure WebClientResponseException for malformed response Closes gh-27262 --- .../client/DefaultClientResponse.java | 2 +- .../function/client/DefaultWebClient.java | 4 +- .../client/WebClientIntegrationTests.java | 106 +++--------------- 3 files changed, 21 insertions(+), 91 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index 927fcdf205..0931bf2140 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -203,7 +203,7 @@ class DefaultClientResponse implements ClientResponse { return bytes; }) .defaultIfEmpty(EMPTY) - .onErrorReturn(IllegalStateException.class::isInstance, EMPTY) + .onErrorReturn(ex -> !(ex instanceof Error), EMPTY) .map(bodyBytes -> { HttpRequest request = this.requestSupplier.get(); Charset charset = headers().contentType().map(MimeType::getCharset).orElse(null); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index aebc7760ac..efaf758ef4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -612,7 +612,9 @@ class DefaultWebClient implements WebClient { public Mono> toBodilessEntity() { return this.responseMono.flatMap(response -> WebClientUtils.mapToEntity(response, handleBodyMono(response, Mono.empty())) - .flatMap(entity -> response.releaseBody().thenReturn(entity)) + .flatMap(entity -> response.releaseBody() + .onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)) + .thenReturn(entity)) ); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 4878ca78c2..e7fa002ff5 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -42,14 +42,10 @@ import java.util.stream.Stream; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; -import okhttp3.mockwebserver.SocketPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.springframework.util.SocketUtils; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -73,7 +69,9 @@ import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.util.SocketUtils; import org.springframework.web.reactive.function.BodyExtractors; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.testfixture.xml.Pojo; import static org.assertj.core.api.Assertions.assertThat; @@ -1220,63 +1218,9 @@ class WebClientIntegrationTests { .verify(); } - static Stream socketFaultArguments() { - Stream.Builder argumentsBuilder = Stream.builder(); - arguments().forEach(arg -> { - argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AT_START)); - argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); - argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AFTER_REQUEST)); - }); - return argumentsBuilder.build(); - } - - @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") - @MethodSource("socketFaultArguments") - void prematureClosureFault(ClientHttpConnector connector, SocketPolicy socketPolicy) { - startServer(connector); - - prepareResponse(response -> response - .setSocketPolicy(socketPolicy) - .setStatus("HTTP/1.1 200 OK") - .setHeader("Response-Header-1", "value 1") - .setHeader("Response-Header-2", "value 2") - .setBody("{\"message\": \"Hello, World!\"}")); - - String uri = "/test"; - Mono result = this.webClient - .post() - .uri(uri) - // Random non-empty body to allow us to interrupt. - .bodyValue("{\"action\": \"Say hello!\"}") - .retrieve() - .bodyToMono(String.class); - - StepVerifier.create(result) - .expectErrorSatisfies(throwable -> { - assertThat(throwable).isInstanceOf(WebClientRequestException.class); - WebClientRequestException ex = (WebClientRequestException) throwable; - // Varies between connector providers. - assertThat(ex.getCause()).isInstanceOf(IOException.class); - }) - .verify(); - } - - static Stream malformedResponseChunkArguments() { - return Stream.of( - Arguments.of(new ReactorClientHttpConnector(), true), - Arguments.of(new JettyClientHttpConnector(), true), - // Apache injects the Transfer-Encoding header for us, and complains with an exception if we also - // add it. The other two connectors do not add the header at all. We need this header for the test - // case to work correctly. - Arguments.of(new HttpComponentsClientHttpConnector(), false) - ); - } - - @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") - @MethodSource("malformedResponseChunkArguments") - void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector, boolean addTransferEncodingHeader) { - Mono result = doMalformedResponseChunks(connector, addTransferEncodingHeader, ResponseSpec::toBodilessEntity); - + @ParameterizedWebClientTest + void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector) { + Mono result = doMalformedChunkedResponseTest(connector, ResponseSpec::toBodilessEntity); StepVerifier.create(result) .expectErrorSatisfies(throwable -> { assertThat(throwable).isInstanceOf(WebClientException.class); @@ -1286,11 +1230,9 @@ class WebClientIntegrationTests { .verify(); } - @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") - @MethodSource("malformedResponseChunkArguments") - void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector, boolean addTransferEncodingHeader) { - Mono result = doMalformedResponseChunks(connector, addTransferEncodingHeader, spec -> spec.toEntity(String.class)); - + @ParameterizedWebClientTest + void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector) { + Mono result = doMalformedChunkedResponseTest(connector, spec -> spec.toEntity(String.class)); StepVerifier.create(result) .expectErrorSatisfies(throwable -> { assertThat(throwable).isInstanceOf(WebClientException.class); @@ -1300,17 +1242,13 @@ class WebClientIntegrationTests { .verify(); } - private Mono doMalformedResponseChunks( - ClientHttpConnector connector, - boolean addTransferEncodingHeader, - Function> responseHandler - ) { + private Mono doMalformedChunkedResponseTest( + ClientHttpConnector connector, Function> handler) { + int port = SocketUtils.findAvailableTcpPort(); Thread serverThread = new Thread(() -> { - // This exists separately to the main mock server, as I had a really hard time getting that to send the - // chunked responses correctly, flushing the socket each time. This was the only way I was able to replicate - // the issue of the client not handling malformed response chunks correctly. + // No way to simulate a malformed chunked response through MockWebServer. try (ServerSocket serverSocket = new ServerSocket(port)) { Socket socket = serverSocket.accept(); InputStream is = socket.getInputStream(); @@ -1324,30 +1262,20 @@ class WebClientIntegrationTests { os.write("\r\n".getBytes(StandardCharsets.UTF_8)); os.write("lskdu018973t09sylgasjkfg1][]'./.sdlv".getBytes(StandardCharsets.UTF_8)); socket.close(); - } catch (IOException ex) { + } + catch (IOException ex) { throw new RuntimeException(ex); } }); - serverThread.setDaemon(true); serverThread.start(); - ResponseSpec spec = WebClient - .builder() + WebClient client = WebClient.builder() .clientConnector(connector) .baseUrl("http://localhost:" + port) - .build() - .post() - .headers(headers -> { - if (addTransferEncodingHeader) { - headers.add(HttpHeaders.TRANSFER_ENCODING, "chunked"); - } - }) - .retrieve(); + .build(); - return responseHandler - .apply(spec) - .doFinally(signal -> serverThread.stop()); + return handler.apply(client.post().retrieve()); } private void prepareResponse(Consumer consumer) {