From b5089ac09208daa12b19b06924eff95b670eb764 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 3 May 2017 17:25:49 -0400 Subject: [PATCH] Support @RequestBody Flux in WebFlux This commit turns the Synchronoss NIO Multipart HttpMessageReader into a reader of Flux and creates a separate reader that aggregates the parts into a MultiValueMap. Issue: SPR-14546 --- .../codec/DefaultServerCodecConfigurer.java | 7 +- .../multipart/MultipartHttpMessageReader.java | 100 ++++++++++++++++++ ... => SynchronossPartHttpMessageReader.java} | 43 +++----- .../codec/ServerCodecConfigurerTests.java | 8 +- .../MultipartHttpMessageWriterTests.java | 4 +- ...ynchronossPartHttpMessageReaderTests.java} | 32 +++--- .../DelegatingWebFluxConfigurationTests.java | 2 +- .../WebFluxConfigurationSupportTests.java | 2 +- .../annotation/MultipartIntegrationTests.java | 57 ++++++++-- 9 files changed, 198 insertions(+), 57 deletions(-) create mode 100644 spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java rename spring-web/src/main/java/org/springframework/http/codec/multipart/{SynchronossMultipartHttpMessageReader.java => SynchronossPartHttpMessageReader.java} (85%) rename spring-web/src/test/java/org/springframework/http/codec/multipart/{SynchronossMultipartHttpMessageReaderTests.java => SynchronossPartHttpMessageReaderTests.java} (79%) diff --git a/spring-web/src/main/java/org/springframework/http/codec/DefaultServerCodecConfigurer.java b/spring-web/src/main/java/org/springframework/http/codec/DefaultServerCodecConfigurer.java index aae8408af0..b4841f2c48 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/DefaultServerCodecConfigurer.java +++ b/spring-web/src/main/java/org/springframework/http/codec/DefaultServerCodecConfigurer.java @@ -21,7 +21,8 @@ import java.util.List; import org.springframework.core.codec.Encoder; import org.springframework.core.codec.StringDecoder; import org.springframework.http.codec.json.Jackson2JsonEncoder; -import org.springframework.http.codec.multipart.SynchronossMultipartHttpMessageReader; +import org.springframework.http.codec.multipart.MultipartHttpMessageReader; +import org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader; import org.springframework.util.ClassUtils; /** @@ -65,7 +66,9 @@ class DefaultServerCodecConfigurer extends DefaultCodecConfigurer implements Ser super.addTypedReadersTo(result); addReaderTo(result, FormHttpMessageReader::new); if (synchronossMultipartPresent) { - addReaderTo(result, SynchronossMultipartHttpMessageReader::new); + SynchronossPartHttpMessageReader partReader = new SynchronossPartHttpMessageReader(); + addReaderTo(result, () -> partReader); + addReaderTo(result, () -> new MultipartHttpMessageReader(partReader)); } } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java new file mode 100644 index 0000000000..63c5b0d8c2 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageReader.java @@ -0,0 +1,100 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec.multipart; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpInputMessage; +import org.springframework.http.codec.HttpMessageReader; +import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * {@code HttpMessageReader} for reading {@code "multipart/form-data"} requests + * into a {@code MultiValueMap}. + * + *

Note that this reader depends on access to an + * {@code HttpMessageReader} for the actual parsing of multipart content. + * The purpose of this reader is to collect the parts into a map. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class MultipartHttpMessageReader implements HttpMessageReader> { + + private static final ResolvableType MULTIPART_VALUE_TYPE = ResolvableType.forClassWithGenerics( + MultiValueMap.class, String.class, Part.class); + + + private final HttpMessageReader partReader; + + + public MultipartHttpMessageReader(HttpMessageReader partReader) { + Assert.notNull(partReader, "'partReader' is required"); + this.partReader = partReader; + } + + + @Override + public List getReadableMediaTypes() { + return Collections.singletonList(MediaType.MULTIPART_FORM_DATA); + } + + @Override + public boolean canRead(ResolvableType elementType, MediaType mediaType) { + return MULTIPART_VALUE_TYPE.isAssignableFrom(elementType) && + (mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)); + } + + + @Override + public Flux> read(ResolvableType elementType, + ReactiveHttpInputMessage message, Map hints) { + + return Flux.from(readMono(elementType, message, hints)); + } + + + @Override + public Mono> readMono(ResolvableType elementType, + ReactiveHttpInputMessage inputMessage, Map hints) { + + return this.partReader.read(elementType, inputMessage, hints) + .collectMultimap(Part::getName).map(this::toMultiValueMap); + } + + private LinkedMultiValueMap toMultiValueMap(Map> map) { + return new LinkedMultiValueMap<>(map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> toList(e.getValue())))); + } + + private List toList(Collection collection) { + return collection instanceof List ? (List) collection : new ArrayList<>(collection); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java similarity index 85% rename from spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReader.java rename to spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java index a94ebfcd01..07a144d25c 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java @@ -25,15 +25,12 @@ import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.synchronoss.cloud.nio.multipart.Multipart; import org.synchronoss.cloud.nio.multipart.MultipartContext; @@ -55,25 +52,24 @@ import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.codec.HttpMessageReader; import org.springframework.util.Assert; -import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MimeType; -import org.springframework.util.MultiValueMap; import org.springframework.util.StreamUtils; /** - * {@code HttpMessageReader} for {@code "multipart/form-data"} requests based - * on the Synchronoss NIO Multipart library. + * {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests + * to a stream of {@link Part}'s using the Synchronoss NIO Multipart library. + * + *

This reader can be provided to {@link MultipartHttpMessageReader} in order + * to aggregate all parts into a Map. * * @author Sebastien Deleuze * @author Rossen Stoyanchev * @author Arjen Poutsma * @since 5.0 * @see Synchronoss NIO Multipart + * @see MultipartHttpMessageReader */ -public class SynchronossMultipartHttpMessageReader implements HttpMessageReader> { - - private static final ResolvableType MULTIPART_VALUE_TYPE = ResolvableType.forClassWithGenerics( - MultiValueMap.class, String.class, Part.class); +public class SynchronossPartHttpMessageReader implements HttpMessageReader { @Override @@ -83,34 +79,25 @@ public class SynchronossMultipartHttpMessageReader implements HttpMessageReader< @Override public boolean canRead(ResolvableType elementType, MediaType mediaType) { - return MULTIPART_VALUE_TYPE.isAssignableFrom(elementType) && + return Part.class.equals(elementType.resolve(Object.class)) && (mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)); } @Override - public Flux> read(ResolvableType elementType, - ReactiveHttpInputMessage message, Map hints) { + public Flux read(ResolvableType elementType, ReactiveHttpInputMessage message, + Map hints) { - return Flux.from(readMono(elementType, message, hints)); + return Flux.create(new SynchronossPartGenerator(message)); } @Override - public Mono> readMono(ResolvableType elementType, - ReactiveHttpInputMessage inputMessage, Map hints) { - - return Flux.create(new SynchronossPartGenerator(inputMessage)) - .collectMultimap(Part::getName).map(this::toMultiValueMap); - } - - private LinkedMultiValueMap toMultiValueMap(Map> map) { - return new LinkedMultiValueMap<>(map.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> toList(e.getValue())))); - } + public Mono readMono(ResolvableType elementType, ReactiveHttpInputMessage message, + Map hints) { - private List toList(Collection collection) { - return collection instanceof List ? (List) collection : new ArrayList<>(collection); + return Mono.error(new UnsupportedOperationException( + "This reader does not support reading a single element.")); } diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerCodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerCodecConfigurerTests.java index 7e4f212ceb..07c8c5f08f 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerCodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerCodecConfigurerTests.java @@ -41,7 +41,8 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.MediaType; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.json.Jackson2JsonEncoder; -import org.springframework.http.codec.multipart.SynchronossMultipartHttpMessageReader; +import org.springframework.http.codec.multipart.MultipartHttpMessageReader; +import org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader; import org.springframework.http.codec.xml.Jaxb2XmlDecoder; import org.springframework.http.codec.xml.Jaxb2XmlEncoder; import org.springframework.util.MimeTypeUtils; @@ -63,14 +64,15 @@ public class ServerCodecConfigurerTests { @Test public void defaultReaders() throws Exception { List> readers = this.configurer.getReaders(); - assertEquals(10, readers.size()); + assertEquals(11, readers.size()); assertEquals(ByteArrayDecoder.class, getNextDecoder(readers).getClass()); assertEquals(ByteBufferDecoder.class, getNextDecoder(readers).getClass()); assertEquals(DataBufferDecoder.class, getNextDecoder(readers).getClass()); assertEquals(ResourceDecoder.class, getNextDecoder(readers).getClass()); assertStringDecoder(getNextDecoder(readers), true); assertEquals(FormHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass()); - assertEquals(SynchronossMultipartHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass()); + assertEquals(SynchronossPartHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass()); + assertEquals(MultipartHttpMessageReader.class, readers.get(this.index.getAndIncrement()).getClass()); assertEquals(Jaxb2XmlDecoder.class, getNextDecoder(readers).getClass()); assertEquals(Jackson2JsonDecoder.class, getNextDecoder(readers).getClass()); assertStringDecoder(getNextDecoder(readers), false); diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java index 75fe7c6a4c..ab6aa211f4 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java @@ -102,7 +102,9 @@ public class MultipartHttpMessageWriterTests { assertNotNull("No boundary found", contentType.getParameter("boundary")); // see if Synchronoss NIO Multipart can read what we wrote - SynchronossMultipartHttpMessageReader reader = new SynchronossMultipartHttpMessageReader(); + SynchronossPartHttpMessageReader synchronossReader = new SynchronossPartHttpMessageReader(); + MultipartHttpMessageReader reader = new MultipartHttpMessageReader(synchronossReader); + MockServerHttpRequest request = MockServerHttpRequest.post("/foo") .header(HttpHeaders.CONTENT_TYPE, contentType.toString()) .body(response.getBody()); diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java similarity index 79% rename from spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReaderTests.java rename to spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java index 194e2ba3b7..7b5e713748 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossMultipartHttpMessageReaderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java @@ -32,53 +32,57 @@ import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.MockHttpOutputMessage; -import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.converter.FormHttpMessageConverter; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import static java.util.Collections.*; -import static org.junit.Assert.*; -import static org.springframework.http.HttpHeaders.*; -import static org.springframework.http.MediaType.*; +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.springframework.core.ResolvableType.forClassWithGenerics; +import static org.springframework.http.HttpHeaders.CONTENT_LENGTH; +import static org.springframework.http.HttpHeaders.CONTENT_TYPE; +import static org.springframework.http.MediaType.MULTIPART_FORM_DATA; /** * @author Sebastien Deleuze */ -public class SynchronossMultipartHttpMessageReaderTests { +public class SynchronossPartHttpMessageReaderTests { - private final HttpMessageReader> reader = new SynchronossMultipartHttpMessageReader(); + private final MultipartHttpMessageReader reader = + new MultipartHttpMessageReader(new SynchronossPartHttpMessageReader()); @Test public void canRead() { assertTrue(this.reader.canRead( - ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class), + forClassWithGenerics(MultiValueMap.class, String.class, Part.class), MediaType.MULTIPART_FORM_DATA)); assertFalse(this.reader.canRead( - ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Object.class), + forClassWithGenerics(MultiValueMap.class, String.class, Object.class), MediaType.MULTIPART_FORM_DATA)); assertFalse(this.reader.canRead( - ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class), + forClassWithGenerics(MultiValueMap.class, String.class, String.class), MediaType.MULTIPART_FORM_DATA)); assertFalse(this.reader.canRead( - ResolvableType.forClassWithGenerics(Map.class, String.class, String.class), + forClassWithGenerics(Map.class, String.class, String.class), MediaType.MULTIPART_FORM_DATA)); assertFalse(this.reader.canRead( - ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class), + forClassWithGenerics(MultiValueMap.class, String.class, Part.class), MediaType.APPLICATION_FORM_URLENCODED)); } @Test public void resolveParts() throws IOException { ServerHttpRequest request = generateMultipartRequest(); - ResolvableType elementType = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class); + ResolvableType elementType = forClassWithGenerics(MultiValueMap.class, String.class, Part.class); MultiValueMap parts = this.reader.readMono(elementType, request, emptyMap()).block(); assertEquals(2, parts.size()); @@ -105,7 +109,7 @@ public class SynchronossMultipartHttpMessageReaderTests { @Test public void bodyError() { ServerHttpRequest request = generateErrorMultipartRequest(); - ResolvableType elementType = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class); + ResolvableType elementType = forClassWithGenerics(MultiValueMap.class, String.class, Part.class); StepVerifier.create(this.reader.readMono(elementType, request, emptyMap())).verifyError(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/config/DelegatingWebFluxConfigurationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/config/DelegatingWebFluxConfigurationTests.java index eff558f6b7..6d3105c761 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/config/DelegatingWebFluxConfigurationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/config/DelegatingWebFluxConfigurationTests.java @@ -103,7 +103,7 @@ public class DelegatingWebFluxConfigurationTests { verify(webFluxConfigurer).configureArgumentResolvers(any()); assertSame(formatterRegistry.getValue(), initializerConversionService); - assertEquals(10, codecsConfigurer.getValue().getReaders().size()); + assertEquals(11, codecsConfigurer.getValue().getReaders().size()); } @Test diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/config/WebFluxConfigurationSupportTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/config/WebFluxConfigurationSupportTests.java index a08132b103..a58186d6ea 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/config/WebFluxConfigurationSupportTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/config/WebFluxConfigurationSupportTests.java @@ -127,7 +127,7 @@ public class WebFluxConfigurationSupportTests { assertNotNull(adapter); List> readers = adapter.getMessageCodecConfigurer().getReaders(); - assertEquals(10, readers.size()); + assertEquals(11, readers.size()); assertHasMessageReader(readers, forClass(byte[].class), APPLICATION_OCTET_STREAM); assertHasMessageReader(readers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java index 67951746ef..07924c3a00 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java @@ -16,8 +16,12 @@ package org.springframework.web.reactive.result.method.annotation; +import java.util.Map; +import java.util.stream.Collectors; + import org.junit.Before; import org.junit.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -35,6 +39,7 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestPart; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.DispatcherHandler; @@ -68,14 +73,10 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes } @Test - public void part() { - test("/part"); - } - - private void test(String uri) { + public void requestPart() { Mono result = webClient .post() - .uri(uri) + .uri("/requestPart") .contentType(MediaType.MULTIPART_FORM_DATA) .body(BodyInserters.fromMultipartData(generateBody())) .exchange(); @@ -86,6 +87,37 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes .verifyComplete(); } + @Test + public void requestBodyMap() { + Mono result = webClient + .post() + .uri("/requestBodyMap") + .contentType(MediaType.MULTIPART_FORM_DATA) + .body(BodyInserters.fromMultipartData(generateBody())) + .retrieve() + .bodyToMono(String.class); + + StepVerifier.create(result) + .consumeNextWith(body -> assertEquals("Map[barPart,fooPart]", body)) + .verifyComplete(); + } + + @Test + public void requestBodyFlux() { + Mono result = webClient + .post() + .uri("/requestBodyFlux") + .contentType(MediaType.MULTIPART_FORM_DATA) + .body(BodyInserters.fromMultipartData(generateBody())) + .retrieve() + .bodyToMono(String.class); + + StepVerifier.create(result) + .consumeNextWith(body -> assertEquals("Flux[barPart,fooPart]", body)) + .verifyComplete(); + } + + private MultiValueMap generateBody() { HttpHeaders fooHeaders = new HttpHeaders(); fooHeaders.setContentType(MediaType.TEXT_PLAIN); @@ -102,11 +134,22 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes @SuppressWarnings("unused") static class MultipartController { - @PostMapping("/part") + @PostMapping("/requestPart") void part(@RequestPart Part fooPart) { assertEquals("foo.txt", fooPart.getFilename().get()); } + @PostMapping("/requestBodyMap") + Mono part(@RequestBody Mono> parts) { + return parts.map(map -> map.toSingleValueMap().entrySet().stream() + .map(Map.Entry::getKey).sorted().collect(Collectors.joining(",", "Map[", "]"))); + } + + @PostMapping("/requestBodyFlux") + Mono part(@RequestBody Flux parts) { + return parts.map(Part::getName).collectList() + .map(names -> names.stream().sorted().collect(Collectors.joining(",", "Flux[", "]"))); + } } @Configuration