From 99474376e6fc5406324becaeb4a8df1fe210ccae Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 4 Apr 2017 15:55:22 +0200 Subject: [PATCH] Server HttpMessage[Reader|Writer] in WebFlux fn This commit introduces support for the server-side methods on HttpMessageReader and HttpMessageWriter. It does so by introducing an Optional ServerHttpRequest in BodyInserter.Context, and an Optional ServerHttpResponse in BodyExtractor.Context. On the client-side, these optionals return Optional.empty(); on the server-side, they return the respective server-side messages. Issue: SPR-15370 --- .../web/reactive/function/BodyExtractor.java | 7 +++ .../web/reactive/function/BodyExtractors.java | 35 +++++++++-- .../web/reactive/function/BodyInserter.java | 7 +++ .../web/reactive/function/BodyInserters.java | 59 ++++++++++++++----- .../client/DefaultClientRequestBuilder.java | 8 +++ .../client/DefaultClientResponse.java | 7 +++ .../server/DefaultEntityResponseBuilder.java | 8 +++ .../function/server/DefaultServerRequest.java | 7 +++ .../server/DefaultServerResponseBuilder.java | 8 +++ .../function/BodyExtractorsTests.java | 16 ++++- .../reactive/function/BodyInsertersTests.java | 55 ++++++++++++++++- 11 files changed, 194 insertions(+), 23 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractor.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractor.java index 5b492d766a..67cbefc265 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractor.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractor.java @@ -17,11 +17,13 @@ package org.springframework.web.reactive.function; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Stream; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.server.reactive.ServerHttpResponse; /** * A function that can extract data from a {@link ReactiveHttpInputMessage} body. @@ -56,6 +58,11 @@ public interface BodyExtractor { */ Supplier>> messageReaders(); + /** + * Optionally return the {@link ServerHttpResponse}, if present. + */ + Optional serverResponse(); + /** * Return the map of hints to use to customize body extraction. */ diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java index 399aabb655..9b1add05c8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.function; import java.util.List; +import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -33,6 +34,7 @@ import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; @@ -67,9 +69,18 @@ public abstract class BodyExtractors { */ public static BodyExtractor, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { Assert.notNull(elementType, "'elementType' must not be null"); - return (request, context) -> readWithMessageReaders(request, context, + return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, - reader -> reader.readMono(elementType, request, context.hints()), + reader -> { + Optional serverResponse = context.serverResponse(); + if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { + return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage, + serverResponse.get(), context.hints()); + } + else { + return reader.readMono(elementType, inputMessage, context.hints()); + } + }, Mono::error); } @@ -94,7 +105,16 @@ public abstract class BodyExtractors { Assert.notNull(elementType, "'elementType' must not be null"); return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, - reader -> reader.read(elementType, inputMessage, context.hints()), + reader -> { + Optional serverResponse = context.serverResponse(); + if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { + return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage, + serverResponse.get(), context.hints()); + } + else { + return reader.read(elementType, inputMessage, context.hints()); + } + }, Flux::error); } @@ -107,9 +127,12 @@ public abstract class BodyExtractors { // the server-side public static BodyExtractor>, ServerHttpRequest> toFormData() { return (serverRequest, context) -> { - HttpMessageReader> messageReader = formMessageReader(context); - return messageReader.readMono(FORM_TYPE, serverRequest, context.hints()); - }; + HttpMessageReader> messageReader = + formMessageReader(context); + return context.serverResponse() + .map(serverResponse -> messageReader.readMono(FORM_TYPE, FORM_TYPE, serverRequest, serverResponse, context.hints())) + .orElseGet(() -> messageReader.readMono(FORM_TYPE, serverRequest, context.hints())); + }; } /** diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserter.java index 9401ab935a..317a2e4dcd 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserter.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.function; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Stream; @@ -24,6 +25,7 @@ import reactor.core.publisher.Mono; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.codec.HttpMessageWriter; +import org.springframework.http.server.reactive.ServerHttpRequest; /** * A combination of functions that can populate a {@link ReactiveHttpOutputMessage} body. @@ -58,6 +60,11 @@ public interface BodyInserter { */ Supplier>> messageWriters(); + /** + * Optionally return the {@link ServerHttpRequest}, if present. + */ + Optional serverRequest(); + /** * Return the map of hints to use for response body conversion. */ diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java index c6fc061096..cd672bb998 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyInserters.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.function; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -32,6 +33,7 @@ import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.codec.HttpMessageWriter; import org.springframework.http.codec.ServerSentEvent; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; @@ -120,10 +122,18 @@ public abstract class BodyInserters { public static BodyInserter fromResource(T resource) { Assert.notNull(resource, "'resource' must not be null"); return (outputMessage, context) -> { - HttpMessageWriter messageWriter = resourceHttpMessageWriter(context); - return messageWriter.write(Mono.just(resource), RESOURCE_TYPE, null, - outputMessage, context.hints()); - }; + Mono inputStream = Mono.just(resource); + HttpMessageWriter messageWriter = resourceHttpMessageWriter(context); + Optional serverRequest = context.serverRequest(); + if (serverRequest.isPresent() && outputMessage instanceof ServerHttpResponse) { + return messageWriter.write(inputStream, RESOURCE_TYPE, RESOURCE_TYPE, null, + serverRequest.get(), (ServerHttpResponse) outputMessage, context.hints()); + } + else { + return messageWriter.write(inputStream, RESOURCE_TYPE, null, + outputMessage, context.hints()); + } + }; } private static HttpMessageWriter resourceHttpMessageWriter(BodyInserter.Context context) { @@ -149,11 +159,15 @@ public abstract class BodyInserters { S eventsPublisher) { Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null"); - return (response, context) -> { + return (serverResponse, context) -> { HttpMessageWriter> messageWriter = findMessageWriter(context, SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM); - return messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE, - MediaType.TEXT_EVENT_STREAM, response, context.hints()); + return context.serverRequest() + .map(serverRequest -> messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE, + SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM, serverRequest, + serverResponse, context.hints())) + .orElseGet(() -> messageWriter.write(eventsPublisher, SERVER_SIDE_EVENT_TYPE, + MediaType.TEXT_EVENT_STREAM, serverResponse, context.hints())); }; } @@ -196,12 +210,15 @@ public abstract class BodyInserters { Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null"); Assert.notNull(eventType, "'eventType' must not be null"); - return (outputMessage, context) -> { + return (serverResponse, context) -> { HttpMessageWriter messageWriter = findMessageWriter(context, SERVER_SIDE_EVENT_TYPE, MediaType.TEXT_EVENT_STREAM); - return messageWriter.write(eventsPublisher, eventType, - MediaType.TEXT_EVENT_STREAM, outputMessage, context.hints()); - + return context.serverRequest() + .map(serverRequest -> messageWriter.write(eventsPublisher, eventType, + eventType, MediaType.TEXT_EVENT_STREAM, serverRequest, + serverResponse, context.hints())) + .orElseGet(() -> messageWriter.write(eventsPublisher, eventType, + MediaType.TEXT_EVENT_STREAM, serverResponse, context.hints())); }; } @@ -211,6 +228,9 @@ public abstract class BodyInserters { * @param formData the form data to write to the output message * @return a {@code BodyInserter} that writes form data */ + // Note that the returned BodyInserter is parameterized to ClientHttpRequest, not + // ReactiveHttpOutputMessage like other methods, since sending form data only typically happens + // on the server-side public static BodyInserter, ClientHttpRequest> fromFormData( MultiValueMap formData) { @@ -241,15 +261,24 @@ public abstract class BodyInserters { private static , M extends ReactiveHttpOutputMessage> BodyInserter bodyInserterFor( P body, ResolvableType bodyType) { - return (m, context) -> { - MediaType contentType = m.getHeaders().getContentType(); + return (outputMessage, context) -> { + MediaType contentType = outputMessage.getHeaders().getContentType(); Supplier>> messageWriters = context.messageWriters(); return messageWriters.get() .filter(messageWriter -> messageWriter.canWrite(bodyType, contentType)) .findFirst() .map(BodyInserters::cast) - .map(messageWriter -> messageWriter - .write(body, bodyType, contentType, m, context.hints())) + .map(messageWriter -> { + Optional serverRequest = context.serverRequest(); + if (serverRequest.isPresent() && outputMessage instanceof ServerHttpResponse) { + return messageWriter.write(body, bodyType, bodyType, contentType, + serverRequest.get(), (ServerHttpResponse) outputMessage, + context.hints()); + } else { + return messageWriter.write(body, bodyType, contentType, outputMessage, + context.hints()); + } + }) .orElseGet(() -> { List supportedMediaTypes = messageWriters.get() .flatMap(reader -> reader.getWritableMediaTypes().stream()) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java index 893171f1d9..17bb7c860e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestBuilder.java @@ -19,6 +19,7 @@ package org.springframework.web.reactive.function.client; import java.net.URI; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Stream; @@ -30,6 +31,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.codec.HttpMessageWriter; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; @@ -188,6 +190,12 @@ class DefaultClientRequestBuilder implements ClientRequest.Builder { public Supplier>> messageWriters() { return strategies.messageWriters(); } + + @Override + public Optional serverRequest() { + return Optional.empty(); + } + @Override public Map hints() { return Collections.emptyMap(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index a64be4599e..8387629a50 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -35,6 +35,7 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseCookie; import org.springframework.http.client.reactive.ClientHttpResponse; import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyExtractor; import org.springframework.web.reactive.function.BodyExtractors; @@ -83,6 +84,12 @@ class DefaultClientResponse implements ClientResponse { public Supplier>> messageReaders() { return strategies.messageReaders(); } + + @Override + public Optional serverResponse() { + return Optional.empty(); + } + @Override public Map hints() { return Collections.emptyMap(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilder.java index 336b93ada0..d28c14bda5 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilder.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Stream; @@ -36,6 +37,7 @@ import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.HttpMessageWriter; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.function.BodyInserter; @@ -209,6 +211,12 @@ class DefaultEntityResponseBuilder implements EntityResponse.Builder { public Supplier>> messageWriters() { return strategies.messageWriters(); } + + @Override + public Optional serverRequest() { + return Optional.of(exchange.getRequest()); + } + @Override public Map hints() { return hints; diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java index c2ccfadca3..0a77a91d0a 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java @@ -38,6 +38,7 @@ import org.springframework.http.HttpRange; import org.springframework.http.MediaType; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.function.BodyExtractor; import org.springframework.web.reactive.function.BodyExtractors; @@ -103,6 +104,12 @@ class DefaultServerRequest implements ServerRequest { public Supplier>> messageReaders() { return DefaultServerRequest.this.strategies.messageReaders(); } + + @Override + public Optional serverResponse() { + return Optional.of(exchange().getResponse()); + } + @Override public Map hints() { return hints; diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java index 369cd8da15..f2c9d9066e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerResponseBuilder.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -38,6 +39,7 @@ import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.HttpMessageWriter; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.function.BodyInserter; @@ -301,6 +303,12 @@ class DefaultServerResponseBuilder implements ServerResponse.BodyBuilder { public Supplier>> messageWriters() { return strategies.messageWriters(); } + + @Override + public Optional serverRequest() { + return Optional.of(exchange.getRequest()); + } + @Override public Map hints() { return hints; diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyExtractorsTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyExtractorsTests.java index b47787f7ee..f83350f987 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyExtractorsTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyExtractorsTests.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Stream; @@ -46,11 +47,12 @@ import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.xml.Jaxb2XmlDecoder; import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest; import org.springframework.util.MultiValueMap; import static org.junit.Assert.*; -import static org.springframework.http.codec.json.Jackson2CodecSupport.*; +import static org.springframework.http.codec.json.Jackson2CodecSupport.JSON_VIEW_HINT; /** * @author Arjen Poutsma @@ -77,6 +79,12 @@ public class BodyExtractorsTests { public Supplier>> messageReaders() { return messageReaders::stream; } + + @Override + public Optional serverResponse() { + return Optional.empty(); + } + @Override public Map hints() { return hints; @@ -194,6 +202,12 @@ public class BodyExtractorsTests { public Supplier>> messageReaders() { return Stream::empty; } + + @Override + public Optional serverResponse() { + return Optional.empty(); + } + @Override public Map hints() { return Collections.emptyMap(); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java index 9fe7b0ded6..092f77704f 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java @@ -21,9 +21,11 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Stream; @@ -42,6 +44,7 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpMethod; +import org.springframework.http.HttpRange; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.codec.EncoderHttpMessageWriter; @@ -52,14 +55,16 @@ import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.codec.ServerSentEventHttpMessageWriter; import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.http.codec.xml.Jaxb2XmlEncoder; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.mock.http.client.reactive.test.MockClientHttpRequest; +import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest; import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.*; import static org.springframework.http.codec.json.Jackson2CodecSupport.JSON_VIEW_HINT; /** @@ -91,6 +96,12 @@ public class BodyInsertersTests { public Supplier>> messageWriters() { return messageWriters::stream; } + + @Override + public Optional serverRequest() { + return Optional.empty(); + } + @Override public Map hints() { return hints; @@ -184,6 +195,48 @@ public class BodyInsertersTests { .verify(); } + @Test + public void ofResourceRange() throws Exception { + final int rangeStart = 10; + Resource body = new ClassPathResource("response.txt", getClass()); + BodyInserter inserter = BodyInserters.fromResource(body); + + MockServerHttpRequest request = MockServerHttpRequest.get("/foo") + .range(HttpRange.createByteRange(rangeStart)) + .build(); + MockServerHttpResponse response = new MockServerHttpResponse(); + Mono result = inserter.insert(response, new BodyInserter.Context() { + @Override + public Supplier>> messageWriters() { + return Collections.>singleton(new ResourceHttpMessageWriter())::stream; + } + + @Override + public Optional serverRequest() { + return Optional.of(request); + } + + @Override + public Map hints() { + return hints; + } + }); + StepVerifier.create(result).expectComplete().verify(); + + byte[] allBytes = Files.readAllBytes(body.getFile().toPath()); + byte[] expectedBytes = new byte[allBytes.length - rangeStart]; + System.arraycopy(allBytes, rangeStart, expectedBytes, 0, expectedBytes.length); + + StepVerifier.create(response.getBody()) + .consumeNextWith(dataBuffer -> { + byte[] resultBytes = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(resultBytes); + assertArrayEquals(expectedBytes, resultBytes); + }) + .expectComplete() + .verify(); + } + @Test public void ofServerSentEventFlux() throws Exception { ServerSentEvent event = ServerSentEvent.builder("foo").build();