Browse Source

Sync to Fluxion to remove Stream confusion

pull/1111/head
Stephane Maldini 9 years ago
parent
commit
9f94f8c88e
  1. 2
      spring-web-reactive/build.gradle
  2. 16
      spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorFluxionConverter.java
  3. 1
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
  4. 4
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncIntegrationTests.java
  5. 12
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java
  6. 16
      spring-web-reactive/src/test/java/org/springframework/web/reactive/DispatcherHandlerErrorTests.java
  7. 12
      spring-web-reactive/src/test/java/org/springframework/web/reactive/ResponseStatusExceptionHandlerTests.java
  8. 15
      spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandlerTests.java
  9. 14
      spring-web-reactive/src/test/java/org/springframework/web/reactive/method/InvocableHandlerMethodTests.java
  10. 18
      spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java

2
spring-web-reactive/build.gradle

@ -88,7 +88,7 @@ dependencies { @@ -88,7 +88,7 @@ dependencies {
optional 'io.reactivex:rxjava:1.1.0'
optional "io.reactivex:rxnetty-http:0.5.0-SNAPSHOT"
optional "com.fasterxml.jackson.core:jackson-databind:2.6.2"
optional "io.projectreactor:reactor-stream:${reactorVersion}"
optional "io.projectreactor:reactor-fluxion:${reactorVersion}"
optional "io.projectreactor:reactor-net:${reactorVersion}"
optional "io.projectreactor:reactor-io:${reactorVersion}"
optional "org.apache.tomcat:tomcat-util:${tomcatVersion}"

16
spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorStreamConverter.java → spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorFluxionConverter.java

@ -20,8 +20,8 @@ import java.util.LinkedHashSet; @@ -20,8 +20,8 @@ import java.util.LinkedHashSet;
import java.util.Set;
import org.reactivestreams.Publisher;
import reactor.rx.Fluxion;
import reactor.rx.Promise;
import reactor.rx.Stream;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.GenericConverter;
@ -30,13 +30,13 @@ import org.springframework.core.convert.converter.GenericConverter; @@ -30,13 +30,13 @@ import org.springframework.core.convert.converter.GenericConverter;
* @author Stephane Maldini
* @author Sebastien Deleuze
*/
public final class ReactiveStreamsToReactorStreamConverter implements GenericConverter {
public final class ReactiveStreamsToReactorFluxionConverter implements GenericConverter {
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Stream.class));
pairs.add(new GenericConverter.ConvertiblePair(Stream.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Fluxion.class));
pairs.add(new GenericConverter.ConvertiblePair(Fluxion.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Promise.class));
pairs.add(new GenericConverter.ConvertiblePair(Promise.class, Publisher.class));
return pairs;
@ -47,17 +47,17 @@ public final class ReactiveStreamsToReactorStreamConverter implements GenericCon @@ -47,17 +47,17 @@ public final class ReactiveStreamsToReactorStreamConverter implements GenericCon
if (source == null) {
return null;
}
if (Stream.class.isAssignableFrom(source.getClass())) {
if (Fluxion.class.isAssignableFrom(source.getClass())) {
return source;
}
else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Stream.from((Publisher)source);
else if (Fluxion.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Fluxion.from((Publisher)source);
}
else if (Promise.class.isAssignableFrom(source.getClass())) {
return source;
}
else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
return Stream.from((Publisher)source).promise();
return Fluxion.from((Publisher)source).promise();
}
return null;
}

1
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

@ -23,7 +23,6 @@ import java.util.function.Supplier; @@ -23,7 +23,6 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rx.Stream;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;

4
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncIntegrationTests.java

@ -23,7 +23,7 @@ import org.junit.Test; @@ -23,7 +23,7 @@ import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SchedulerGroup;
import reactor.core.timer.Timer;
import reactor.rx.Stream;
import reactor.rx.Fluxion;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
@ -63,7 +63,7 @@ public class AsyncIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -63,7 +63,7 @@ public class AsyncIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.setBody(Stream.just("h", "e", "l", "l", "o")
return response.setBody(Fluxion.just("h", "e", "l", "l", "o")
.useTimer(Timer.global())
.throttleRequest(100)
.dispatchOn(asyncGroup)

12
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/WriteWithOperatorTests.java

@ -29,8 +29,8 @@ import org.reactivestreams.Subscriber; @@ -29,8 +29,8 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.rx.Fluxion;
import reactor.rx.Signal;
import reactor.rx.Stream;
import static org.junit.Assert.*;
@ -55,7 +55,7 @@ public class WriteWithOperatorTests { @@ -55,7 +55,7 @@ public class WriteWithOperatorTests {
public void errorBeforeFirstItem() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Publisher<Void> completion = Flux.<String>error(error).lift(this.operator);
List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());
@ -64,7 +64,7 @@ public class WriteWithOperatorTests { @@ -64,7 +64,7 @@ public class WriteWithOperatorTests {
@Test
public void completionBeforeFirstItem() throws Exception {
Publisher<Void> completion = Flux.<String>empty().lift(this.operator);
List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -76,7 +76,7 @@ public class WriteWithOperatorTests { @@ -76,7 +76,7 @@ public class WriteWithOperatorTests {
@Test
public void writeOneItem() throws Exception {
Publisher<Void> completion = Flux.just("one").lift(this.operator);
List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -91,7 +91,7 @@ public class WriteWithOperatorTests { @@ -91,7 +91,7 @@ public class WriteWithOperatorTests {
public void writeMultipleItems() throws Exception {
List<String> items = Arrays.asList("one", "two", "three");
Publisher<Void> completion = Flux.fromIterable(items).lift(this.operator);
List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue("Unexpected signal: " + signals.get(0), signals.get(0).isOnComplete());
@ -114,7 +114,7 @@ public class WriteWithOperatorTests { @@ -114,7 +114,7 @@ public class WriteWithOperatorTests {
}
}, subscriber -> new AtomicInteger());
Publisher<Void> completion = publisher.lift(this.operator);
List<Signal<Void>> signals = Stream.from(completion).materialize().toList().get();
List<Signal<Void>> signals = Fluxion.from(completion).materialize().toList().get();
assertEquals(1, signals.size());
assertSame("Unexpected signal: " + signals.get(0), error, signals.get(0).getThrowable());

16
spring-web-reactive/src/test/java/org/springframework/web/reactive/DispatcherHandlerErrorTests.java

@ -24,8 +24,8 @@ import org.junit.Before; @@ -24,8 +24,8 @@ import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.rx.Fluxion;
import reactor.rx.Signal;
import reactor.rx.Stream;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
@ -49,14 +49,14 @@ import org.springframework.web.bind.annotation.ResponseBody; @@ -49,14 +49,14 @@ import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.reactive.method.annotation.RequestMappingHandlerAdapter;
import org.springframework.web.reactive.method.annotation.RequestMappingHandlerMapping;
import org.springframework.web.reactive.method.annotation.ResponseBodyResultHandler;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.handler.ExceptionHandlingWebHandler;
import org.springframework.web.server.handler.FilteringWebHandler;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import org.springframework.web.server.WebHandler;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.handler.ExceptionHandlingWebHandler;
import org.springframework.web.server.handler.FilteringWebHandler;
import org.springframework.web.server.session.WebSessionManager;
import static org.hamcrest.CoreMatchers.startsWith;
@ -192,7 +192,7 @@ public class DispatcherHandlerErrorTests { @@ -192,7 +192,7 @@ public class DispatcherHandlerErrorTests {
WebHandler webHandler = new ExceptionHandlingWebHandler(this.dispatcherHandler, exceptionHandler);
Publisher<Void> publisher = webHandler.handle(this.exchange);
Stream.from(publisher).toList().get();
Fluxion.from(publisher).toList().get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
@ -204,13 +204,13 @@ public class DispatcherHandlerErrorTests { @@ -204,13 +204,13 @@ public class DispatcherHandlerErrorTests {
webHandler = new ExceptionHandlingWebHandler(webHandler, new ServerError500ExceptionHandler());
Publisher<Void> publisher = webHandler.handle(this.exchange);
Stream.from(publisher).toList().get();
Fluxion.from(publisher).toList().get();
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, this.response.getStatus());
}
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
Signal<?> signal = Stream.from(publisher).materialize().toList().get().get(0);
Signal<?> signal = Fluxion.from(publisher).materialize().toList().get().get(0);
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
return signal.getThrowable();
}

12
spring-web-reactive/src/test/java/org/springframework/web/reactive/ResponseStatusExceptionHandlerTests.java

@ -21,21 +21,19 @@ import java.util.List; @@ -21,21 +21,19 @@ import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.rx.Fluxion;
import reactor.rx.Signal;
import reactor.rx.Stream;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.MockServerHttpRequest;
import org.springframework.http.server.reactive.MockServerHttpResponse;
import org.springframework.web.ResponseStatusException;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.session.WebSessionManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
/**
@ -65,7 +63,7 @@ public class ResponseStatusExceptionHandlerTests { @@ -65,7 +63,7 @@ public class ResponseStatusExceptionHandlerTests {
Throwable ex = new ResponseStatusException(HttpStatus.BAD_REQUEST);
Publisher<Void> publisher = this.handler.handle(this.exchange, ex);
Stream.from(publisher).toList().get();
Fluxion.from(publisher).toList().get();
assertEquals(HttpStatus.BAD_REQUEST, this.response.getStatus());
}
@ -74,7 +72,7 @@ public class ResponseStatusExceptionHandlerTests { @@ -74,7 +72,7 @@ public class ResponseStatusExceptionHandlerTests {
Throwable ex = new IllegalStateException();
Publisher<Void> publisher = this.handler.handle(this.exchange, ex);
List<Signal<Void>> signals = Stream.from(publisher).materialize().toList().get();
List<Signal<Void>> signals = Fluxion.from(publisher).materialize().toList().get();
assertEquals(1, signals.size());
assertTrue(signals.get(0).hasError());
assertSame(ex, signals.get(0).getThrowable());

15
spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandlerTests.java

@ -18,22 +18,23 @@ package org.springframework.web.reactive.handler; @@ -18,22 +18,23 @@ package org.springframework.web.reactive.handler;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.rx.Stream;
import reactor.rx.Fluxion;
import rx.Observable;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorFluxionConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.ui.ExtendedModelMap;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerResult;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author Sebastien Deleuze
*/
@ -59,7 +60,7 @@ public class SimpleHandlerResultHandlerTests { @@ -59,7 +60,7 @@ public class SimpleHandlerResultHandlerTests {
hm = new HandlerMethod(controller, TestController.class.getMethod("streamVoid"));
type = ResolvableType.forMethodParameter(hm.getReturnType());
// Reactor Stream is a Publisher
// Reactor Fluxion is a Publisher
assertTrue(resultHandler.supports(createHandlerResult(hm, type)));
hm = new HandlerMethod(controller, TestController.class.getMethod("observableVoid"));
@ -76,7 +77,7 @@ public class SimpleHandlerResultHandlerTests { @@ -76,7 +77,7 @@ public class SimpleHandlerResultHandlerTests {
GenericConversionService conversionService = new GenericConversionService();
conversionService.addConverter(new ReactiveStreamsToCompletableFutureConverter());
conversionService.addConverter(new ReactiveStreamsToReactorStreamConverter());
conversionService.addConverter(new ReactiveStreamsToReactorFluxionConverter());
conversionService.addConverter(new ReactiveStreamsToRxJava1Converter());
SimpleHandlerResultHandler resultHandler = new SimpleHandlerResultHandler(conversionService);
TestController controller = new TestController();
@ -125,7 +126,7 @@ public class SimpleHandlerResultHandlerTests { @@ -125,7 +126,7 @@ public class SimpleHandlerResultHandlerTests {
return null;
}
public Stream<Void> streamVoid() {
public Fluxion<Void> streamVoid() {
return null;
}

14
spring-web-reactive/src/test/java/org/springframework/web/reactive/method/InvocableHandlerMethodTests.java

@ -26,8 +26,8 @@ import org.junit.Test; @@ -26,8 +26,8 @@ import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rx.Fluxion;
import reactor.rx.Signal;
import reactor.rx.Stream;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -37,8 +37,8 @@ import org.springframework.web.bind.annotation.RequestParam; @@ -37,8 +37,8 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerResult;
import org.springframework.web.reactive.method.annotation.RequestParamArgumentResolver;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.adapter.DefaultServerWebExchange;
import org.springframework.web.server.session.WebSessionManager;
import static org.junit.Assert.assertEquals;
@ -73,7 +73,7 @@ public class InvocableHandlerMethodTests { @@ -73,7 +73,7 @@ public class InvocableHandlerMethodTests {
InvocableHandlerMethod hm = createHandlerMethod("noArgs");
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
List<HandlerResult> values = Stream.from(publisher).toList().get();
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success", values.get(0).getReturnValue().get());
@ -86,7 +86,7 @@ public class InvocableHandlerMethodTests { @@ -86,7 +86,7 @@ public class InvocableHandlerMethodTests {
hm.setHandlerMethodArgumentResolvers(Collections.singletonList(new RequestParamArgumentResolver()));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
List<HandlerResult> values = Stream.from(publisher).toList().get();
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success:null", values.get(0).getReturnValue().get());
@ -98,7 +98,7 @@ public class InvocableHandlerMethodTests { @@ -98,7 +98,7 @@ public class InvocableHandlerMethodTests {
addResolver(hm, Mono.just("value1"));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
List<HandlerResult> values = Stream.from(publisher).toList().get();
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success:value1", values.get(0).getReturnValue().get());
@ -110,7 +110,7 @@ public class InvocableHandlerMethodTests { @@ -110,7 +110,7 @@ public class InvocableHandlerMethodTests {
addResolver(hm, Flux.fromIterable(Arrays.asList("value1", "value2", "value3")));
Publisher<HandlerResult> publisher = hm.invokeForRequest(this.exchange, this.model);
List<HandlerResult> values = Stream.from(publisher).toList().get();
List<HandlerResult> values = Fluxion.from(publisher).toList().get();
assertEquals(1, values.size());
assertEquals("success:value1", values.get(0).getReturnValue().get());
@ -200,7 +200,7 @@ public class InvocableHandlerMethodTests { @@ -200,7 +200,7 @@ public class InvocableHandlerMethodTests {
}
private Throwable awaitErrorSignal(Publisher<?> publisher) throws Exception {
Signal<?> signal = Stream.from(publisher).materialize().toList().get().get(0);
Signal<?> signal = Fluxion.from(publisher).materialize().toList().get().get(0);
assertEquals("Unexpected signal: " + signal, Signal.Type.ERROR, signal.getType());
return signal.getThrowable();
}

18
spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java

@ -29,8 +29,8 @@ import org.junit.Test; @@ -29,8 +29,8 @@ import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rx.Fluxion;
import reactor.rx.Promise;
import reactor.rx.Stream;
import rx.Observable;
import rx.Single;
@ -47,7 +47,7 @@ import org.springframework.core.codec.support.StringEncoder; @@ -47,7 +47,7 @@ import org.springframework.core.codec.support.StringEncoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorStreamConverter;
import org.springframework.core.convert.support.ReactiveStreamsToReactorFluxionConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
@ -398,7 +398,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @@ -398,7 +398,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
// TODO: test failures with DefaultConversionService
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToReactorStreamConverter());
service.addConverter(new ReactiveStreamsToReactorFluxionConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
return service;
}
@ -491,7 +491,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @@ -491,7 +491,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/stream-result")
public Publisher<Long> stringStreamResponseBody() {
return Flux.interval(1).as(Stream::from).take(5);
return Flux.interval(1).as(Fluxion::from).take(5);
}
@RequestMapping("/raw-flux")
@ -540,8 +540,8 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @@ -540,8 +540,8 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
}
@RequestMapping("/stream")
public Stream<Person> reactorStreamResponseBody() {
return Stream.just(new Person("Robert"), new Person("Marie"));
public Fluxion<Person> reactorStreamResponseBody() {
return Fluxion.just(new Person("Robert"), new Person("Marie"));
}
@RequestMapping("/publisher-capitalize")
@ -562,7 +562,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @@ -562,7 +562,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
}
@RequestMapping("/stream-capitalize")
public Stream<Person> streamCapitalize(@RequestBody Stream<Person> persons) {
public Fluxion<Person> streamCapitalize(@RequestBody Fluxion<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));
}
@ -589,7 +589,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @@ -589,7 +589,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@RequestMapping("/promise-capitalize")
public Promise<Person> promiseCapitalize(@RequestBody Promise<Person> personFuture) {
return Stream.from(personFuture.map(person -> new Person(person.getName().toUpperCase()))).promise();
return Fluxion.from(personFuture.map(person -> new Person(person.getName().toUpperCase()))).promise();
}
@RequestMapping("/publisher-create")
@ -603,7 +603,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @@ -603,7 +603,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
}
@RequestMapping("/stream-create")
public Publisher<Void> streamCreate(@RequestBody Stream<Person> personStream) {
public Publisher<Void> streamCreate(@RequestBody Fluxion<Person> personStream) {
return personStream.toList().doOnSuccess(persons::addAll).after();
}

Loading…
Cancel
Save