Browse Source

Add DataBuffer BodyInserter/BodyExtractor

Added a BodyExtractor for Flux<DataBuffer>, and a BodyInserter for
Publisher<DataBuffer>

Issue: SPR-14918
pull/1242/merge
Arjen Poutsma 8 years ago
parent
commit
735e288d46
  1. 17
      spring-web/src/main/java/org/springframework/http/codec/BodyExtractors.java
  2. 22
      spring-web/src/main/java/org/springframework/http/codec/BodyInserters.java
  3. 24
      spring-web/src/test/java/org/springframework/http/codec/BodyExtractorsTests.java
  4. 24
      spring-web/src/test/java/org/springframework/http/codec/BodyInsertersTests.java

17
spring-web/src/main/java/org/springframework/http/codec/BodyExtractors.java

@ -28,6 +28,7 @@ import reactor.core.publisher.Flux; @@ -28,6 +28,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMessage;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
@ -85,12 +86,24 @@ public abstract class BodyExtractors { @@ -85,12 +86,24 @@ public abstract class BodyExtractors {
*/
public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) {
Assert.notNull(elementType, "'elementType' must not be null");
return (request, context) -> readWithMessageReaders(request, context,
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
elementType,
reader -> reader.read(elementType, request, Collections.emptyMap()),
reader -> reader.read(elementType, inputMessage, Collections.emptyMap()),
Flux::error);
}
/**
* Return a {@code BodyExtractor} that returns the body of the message as a {@link Flux} of
* {@link DataBuffer}s.
* <p><strong>Note</strong> that the returned buffers should be released after usage by calling
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}
* @return a {@code BodyExtractor} that returns the body
* @see ReactiveHttpInputMessage#getBody()
*/
public static BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> toDataBuffers() {
return (inputMessage, context) -> inputMessage.getBody();
}
private static <T, S extends Publisher<T>> S readWithMessageReaders(
ReactiveHttpInputMessage inputMessage,
BodyExtractor.Context context,

22
spring-web/src/main/java/org/springframework/http/codec/BodyInserters.java

@ -28,6 +28,7 @@ import reactor.core.publisher.Mono; @@ -28,6 +28,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -182,16 +183,33 @@ public abstract class BodyInserters { @@ -182,16 +183,33 @@ public abstract class BodyInserters {
Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null");
Assert.notNull(eventType, "'eventType' must not be null");
return BodyInserter.of(
(response, context) -> {
(outputMessage, context) -> {
HttpMessageWriter<T> messageWriter = sseMessageWriter(context);
return messageWriter.write(eventsPublisher, eventType,
MediaType.TEXT_EVENT_STREAM, response, Collections.emptyMap());
MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap());
},
() -> eventsPublisher
);
}
/**
* Return a {@code BodyInserter} that writes the given {@code Publisher<DataBuffer>} to the
* body.
* @param publisher the data buffer publisher to write
* @param <T> the type of the publisher
* @return a {@code BodyInserter} that writes directly to the body
* @see ReactiveHttpOutputMessage#writeWith(Publisher)
*/
public static <T extends Publisher<DataBuffer>> BodyInserter<T, ReactiveHttpOutputMessage> fromDataBuffers(T publisher) {
Assert.notNull(publisher, "'publisher' must not be null");
return BodyInserter.of(
(outputMessage, context) -> outputMessage.writeWith(publisher),
() -> publisher
);
}
private static <T> HttpMessageWriter<T> sseMessageWriter(BodyInserter.Context context) {
return context.messageWriters().get()
.filter(messageWriter -> messageWriter

24
spring-web/src/test/java/org/springframework/http/codec/BodyExtractorsTests.java

@ -19,7 +19,6 @@ package org.springframework.http.codec; @@ -19,7 +19,6 @@ package org.springframework.http.codec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -40,7 +39,6 @@ import org.springframework.http.ReactiveHttpInputMessage; @@ -40,7 +39,6 @@ import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
import org.springframework.web.server.UnsupportedMediaTypeStatusException;
/**
* @author Arjen Poutsma
@ -122,7 +120,7 @@ public class BodyExtractorsTests { @@ -122,7 +120,7 @@ public class BodyExtractorsTests {
BodyExtractor.Context emptyContext = new BodyExtractor.Context() {
@Override
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
return () -> Collections.<HttpMessageReader<?>>emptySet().stream();
return Stream::empty;
}
};
@ -132,4 +130,24 @@ public class BodyExtractorsTests { @@ -132,4 +130,24 @@ public class BodyExtractorsTests {
.verify();
}
@Test
public void toDataBuffers() throws Exception {
BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> extractor = BodyExtractors.toDataBuffers();
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = new MockServerHttpRequest();
request.setBody(body);
Flux<DataBuffer> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext(dataBuffer)
.expectComplete()
.verify();
}
}

24
spring-web/src/test/java/org/springframework/http/codec/BodyInsertersTests.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.http.codec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
@ -35,6 +36,7 @@ import org.springframework.core.codec.CharSequenceEncoder; @@ -35,6 +36,7 @@ import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
@ -164,4 +166,26 @@ public class BodyInsertersTests { @@ -164,4 +166,26 @@ public class BodyInsertersTests {
StepVerifier.create(result).expectNextCount(0).expectComplete().verify();
}
@Test
public void ofDataBuffers() throws Exception {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(body);
assertEquals(body, inserter.t());
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.expectNext(dataBuffer)
.expectComplete()
.verify();
}
}
Loading…
Cancel
Save