Browse Source
Support for Single via a ReturnValueHandler. Support for Observable via SseEmitter.pull/6/head
14 changed files with 929 additions and 154 deletions
@ -0,0 +1,68 @@
@@ -0,0 +1,68 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.springframework.web.context.request.async.DeferredResult; |
||||
|
||||
import rx.Observable; |
||||
import rx.Subscriber; |
||||
import rx.Subscription; |
||||
|
||||
/** |
||||
* A subscriber that sets the single value produced by the {@link Observable} on the {@link DeferredResult}. |
||||
* |
||||
* @author Jakub Narloch |
||||
* @see DeferredResult |
||||
*/ |
||||
class DeferredResultSubscriber<T> extends Subscriber<T> implements Runnable { |
||||
|
||||
private final DeferredResult<T> deferredResult; |
||||
|
||||
private final Subscription subscription; |
||||
|
||||
private boolean completed; |
||||
|
||||
public DeferredResultSubscriber(Observable<T> observable, DeferredResult<T> deferredResult) { |
||||
|
||||
this.deferredResult = deferredResult; |
||||
this.deferredResult.onTimeout(this); |
||||
this.deferredResult.onCompletion(this); |
||||
this.subscription = observable.subscribe(this); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(T value) { |
||||
if (!completed) { |
||||
deferredResult.setResult(value); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable e) { |
||||
deferredResult.setErrorResult(e); |
||||
} |
||||
|
||||
@Override |
||||
public void onCompleted() { |
||||
completed = true; |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
this.subscription.unsubscribe(); |
||||
} |
||||
} |
@ -1,75 +0,0 @@
@@ -1,75 +0,0 @@
|
||||
/* |
||||
* Copyright 2013-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.springframework.core.MethodParameter; |
||||
import org.springframework.web.context.request.NativeWebRequest; |
||||
import org.springframework.web.context.request.async.DeferredResult; |
||||
import org.springframework.web.context.request.async.WebAsyncUtils; |
||||
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; |
||||
import org.springframework.web.method.support.ModelAndViewContainer; |
||||
|
||||
import rx.Observable; |
||||
import rx.functions.Action1; |
||||
|
||||
/** |
||||
* MVC handler for return values of type {@link rx.Observable}. |
||||
* |
||||
* @author Spencer Gibb |
||||
*/ |
||||
public class ObservableReturnValueHandler |
||||
implements AsyncHandlerMethodReturnValueHandler { |
||||
|
||||
@Override |
||||
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { |
||||
return returnValue != null && returnValue instanceof Observable; |
||||
} |
||||
|
||||
@Override |
||||
public boolean supportsReturnType(MethodParameter returnType) { |
||||
return Observable.class.isAssignableFrom(returnType.getParameterType()); |
||||
} |
||||
|
||||
@Override |
||||
public void handleReturnValue(Object returnValue, MethodParameter returnType, |
||||
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) |
||||
throws Exception { |
||||
if (returnValue == null) { |
||||
mavContainer.setRequestHandled(true); |
||||
return; |
||||
} |
||||
|
||||
Observable<?> observable = Observable.class.cast(returnValue); |
||||
|
||||
final DeferredResult<Object> deferredResult = new DeferredResult<>(); |
||||
|
||||
observable.subscribe(new Action1<Object>() { |
||||
@Override |
||||
public void call(Object o) { |
||||
deferredResult.setResult(o); |
||||
} |
||||
}, new Action1<Throwable>() { |
||||
@Override |
||||
public void call(Throwable throwable) { |
||||
deferredResult.setErrorResult(throwable); |
||||
} |
||||
}); |
||||
|
||||
WebAsyncUtils.getAsyncManager(webRequest) |
||||
.startDeferredResultProcessing(deferredResult, mavContainer); |
||||
} |
||||
} |
@ -0,0 +1,45 @@
@@ -0,0 +1,45 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
||||
import rx.Observable; |
||||
|
||||
/** |
||||
* A specialized {@link SseEmitter} that handles {@link Observable} return types. |
||||
* |
||||
* @author Jakub Narloch |
||||
* @see SseEmitter |
||||
*/ |
||||
class ObservableSseEmitter<T> extends SseEmitter { |
||||
|
||||
private final ResponseBodyEmitterSubscriber<T> subscriber; |
||||
|
||||
public ObservableSseEmitter(Observable<T> observable) { |
||||
this(null, observable); |
||||
} |
||||
|
||||
public ObservableSseEmitter(MediaType mediaType, Observable<T> observable) { |
||||
this(null, mediaType, observable); |
||||
} |
||||
|
||||
public ObservableSseEmitter(Long timeout, MediaType mediaType, Observable<T> observable) { |
||||
super(timeout); |
||||
this.subscriber = new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); |
||||
} |
||||
} |
@ -0,0 +1,90 @@
@@ -0,0 +1,90 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import java.io.IOException; |
||||
|
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; |
||||
|
||||
import rx.Observable; |
||||
import rx.Subscriber; |
||||
import rx.Subscription; |
||||
|
||||
|
||||
/** |
||||
* Subscriber that emits any value produced by the {@link Observable} into the delegated {@link ResponseBodyEmitter}. |
||||
* |
||||
* @author Jakub Narloch |
||||
*/ |
||||
class ResponseBodyEmitterSubscriber<T> extends Subscriber<T> implements Runnable { |
||||
|
||||
private final MediaType mediaType; |
||||
|
||||
private final Subscription subscription; |
||||
|
||||
private final ResponseBodyEmitter responseBodyEmitter; |
||||
|
||||
private boolean completed; |
||||
|
||||
/** |
||||
* Creates new instance of {@link ResponseBodyEmitterSubscriber} with response media type, observable and response |
||||
* emitter. |
||||
* |
||||
* @param mediaType the marshaled object media type |
||||
* @param observable the observable |
||||
* @param responseBodyEmitter the response emitter |
||||
*/ |
||||
public ResponseBodyEmitterSubscriber(MediaType mediaType, Observable<T> observable, ResponseBodyEmitter responseBodyEmitter) { |
||||
|
||||
this.mediaType = mediaType; |
||||
this.responseBodyEmitter = responseBodyEmitter; |
||||
this.responseBodyEmitter.onTimeout(this); |
||||
this.responseBodyEmitter.onCompletion(this); |
||||
this.subscription = observable.subscribe(this); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(T value) { |
||||
|
||||
try { |
||||
if(!completed) { |
||||
responseBodyEmitter.send(value, mediaType); |
||||
} |
||||
} catch (IOException e) { |
||||
throw new RuntimeException(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable e) { |
||||
responseBodyEmitter.completeWithError(e); |
||||
} |
||||
|
||||
@Override |
||||
public void onCompleted() { |
||||
if(!completed) { |
||||
completed = true; |
||||
responseBodyEmitter.complete(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
subscription.unsubscribe(); |
||||
} |
||||
} |
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
||||
|
||||
import rx.Observable; |
||||
|
||||
/** |
||||
* A convenient class allowing to wrap either the {@link Observable} into a response supported by the |
||||
* Spring MVC. |
||||
* |
||||
* @author Jakub Narloch |
||||
*/ |
||||
public final class RxResponse { |
||||
|
||||
private RxResponse() { |
||||
|
||||
} |
||||
|
||||
/** |
||||
* Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted |
||||
* as server side event. |
||||
* |
||||
* @param observable the observable instance |
||||
* @param <T> the result type |
||||
* @return the sse emitter |
||||
*/ |
||||
public static <T> SseEmitter sse(Observable<T> observable) { |
||||
return new ObservableSseEmitter<>(observable); |
||||
} |
||||
|
||||
/** |
||||
* Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted |
||||
* as server side event. |
||||
* |
||||
* @param mediaType the media type of produced entry |
||||
* @param observable the observable instance |
||||
* @param <T> the result type |
||||
* @return the sse emitter |
||||
*/ |
||||
public static <T> SseEmitter sse(MediaType mediaType, Observable<T> observable) { |
||||
return new ObservableSseEmitter<>(mediaType, observable); |
||||
} |
||||
|
||||
/** |
||||
* Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted |
||||
* as server side event. |
||||
* |
||||
* @param timeout the response timeout |
||||
* @param mediaType the media type of produced entry |
||||
* @param observable the observable instance |
||||
* @param <T> the result type |
||||
* @return the sse emitter |
||||
*/ |
||||
public static <T> SseEmitter sse(long timeout, MediaType mediaType, Observable<T> observable) { |
||||
return new ObservableSseEmitter<>(timeout, mediaType, observable); |
||||
} |
||||
} |
@ -0,0 +1,50 @@
@@ -0,0 +1,50 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.context.request.async.DeferredResult; |
||||
|
||||
import rx.Single; |
||||
|
||||
/** |
||||
* A specialized {@link DeferredResult} that handles {@link Single} return type. |
||||
* |
||||
* @author Jakub Narloch |
||||
* @see DeferredResult |
||||
*/ |
||||
class SingleDeferredResult<T> extends DeferredResult<T> { |
||||
|
||||
private static final Object EMPTY_RESULT = new Object(); |
||||
|
||||
private final DeferredResultSubscriber<T> subscriber; |
||||
|
||||
public SingleDeferredResult(Single<T> single) { |
||||
this(null, EMPTY_RESULT, single); |
||||
} |
||||
|
||||
public SingleDeferredResult(long timeout, Single<T> single) { |
||||
this(timeout, EMPTY_RESULT, single); |
||||
} |
||||
|
||||
public SingleDeferredResult(Long timeout, Object timeoutResult, Single<T> single) { |
||||
super(timeout, timeoutResult); |
||||
Assert.notNull(single, "single can not be null"); |
||||
|
||||
subscriber = new DeferredResultSubscriber<>(single.toObservable(), this); |
||||
} |
||||
} |
@ -0,0 +1,117 @@
@@ -0,0 +1,117 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.springframework.core.MethodParameter; |
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.ResponseEntity; |
||||
import org.springframework.web.context.request.NativeWebRequest; |
||||
import org.springframework.web.context.request.async.DeferredResult; |
||||
import org.springframework.web.context.request.async.WebAsyncUtils; |
||||
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; |
||||
import org.springframework.web.method.support.ModelAndViewContainer; |
||||
|
||||
import rx.Single; |
||||
import rx.functions.Func1; |
||||
|
||||
/** |
||||
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Single} return types. |
||||
* |
||||
* @author Spencer Gibb |
||||
* @author Jakub Narloch |
||||
*/ |
||||
public class SingleReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { |
||||
|
||||
@Override |
||||
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { |
||||
return returnValue != null && supportsReturnType(returnType); |
||||
} |
||||
|
||||
@Override |
||||
public boolean supportsReturnType(MethodParameter returnType) { |
||||
return Single.class.isAssignableFrom(returnType.getParameterType()) || isResponseEntity(returnType); |
||||
} |
||||
|
||||
private boolean isResponseEntity(MethodParameter returnType) { |
||||
if(ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) { |
||||
Class<?> bodyType = ResolvableType.forMethodParameter(returnType).getGeneric(0).resolve(); |
||||
return bodyType != null && Single.class.isAssignableFrom(bodyType); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
@Override |
||||
public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { |
||||
|
||||
if (returnValue == null) { |
||||
mavContainer.setRequestHandled(true); |
||||
return; |
||||
} |
||||
|
||||
ResponseEntity<Single<?>> responseEntity = getResponseEntity(returnValue); |
||||
if(responseEntity != null) { |
||||
returnValue = responseEntity.getBody(); |
||||
if (returnValue == null) { |
||||
mavContainer.setRequestHandled(true); |
||||
return; |
||||
} |
||||
} |
||||
|
||||
final Single<?> single = Single.class.cast(returnValue); |
||||
WebAsyncUtils.getAsyncManager(webRequest) |
||||
.startDeferredResultProcessing(convertToDeferredResult(responseEntity, single), mavContainer); |
||||
} |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
private ResponseEntity<Single<?>> getResponseEntity(Object returnValue) { |
||||
if (ResponseEntity.class.isAssignableFrom(returnValue.getClass())) { |
||||
return (ResponseEntity<Single<?>>) returnValue; |
||||
|
||||
} |
||||
return null; |
||||
} |
||||
|
||||
protected DeferredResult<?> convertToDeferredResult(final ResponseEntity<Single<?>> responseEntity, Single<?> single) { |
||||
|
||||
//TODO: fix when java8 :-)
|
||||
Single<ResponseEntity> singleResponse = single.map(new Func1<Object, ResponseEntity>() { |
||||
@Override |
||||
public ResponseEntity call(Object object) { |
||||
return new ResponseEntity<>(object, getHttpHeaders(responseEntity), getHttpStatus(responseEntity)); |
||||
} |
||||
}); |
||||
|
||||
return new SingleDeferredResult<>(singleResponse); |
||||
} |
||||
|
||||
private HttpStatus getHttpStatus(ResponseEntity<?> responseEntity) { |
||||
if(responseEntity == null) { |
||||
return HttpStatus.OK; |
||||
} |
||||
return responseEntity.getStatusCode(); |
||||
} |
||||
|
||||
private HttpHeaders getHttpHeaders(ResponseEntity<?> responseEntity) { |
||||
if(responseEntity == null) { |
||||
return new HttpHeaders(); |
||||
} |
||||
return responseEntity.getHeaders(); |
||||
} |
||||
} |
@ -0,0 +1,174 @@
@@ -0,0 +1,174 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import org.junit.Ignore; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
||||
import org.springframework.boot.test.IntegrationTest; |
||||
import org.springframework.boot.test.SpringApplicationConfiguration; |
||||
import org.springframework.boot.test.TestRestTemplate; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.http.ResponseEntity; |
||||
import org.springframework.test.annotation.DirtiesContext; |
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
||||
import org.springframework.test.context.web.WebAppConfiguration; |
||||
import org.springframework.web.bind.annotation.RequestMapping; |
||||
import org.springframework.web.bind.annotation.RequestMethod; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
|
||||
import rx.Observable; |
||||
import rx.Single; |
||||
import rx.functions.Func1; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNotNull; |
||||
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8; |
||||
|
||||
/** |
||||
* Tests the demonstrate using {@link Observable} with {@link SingleReturnValueHandler} class. |
||||
* |
||||
* @author Spencer Gibb |
||||
* @author Jakub Narloch |
||||
*/ |
||||
@RunWith(SpringJUnit4ClassRunner.class) |
||||
@SpringApplicationConfiguration(classes = ObservableReturnValueHandlerTest.Application.class) |
||||
@WebAppConfiguration |
||||
@IntegrationTest({"server.port=0"}) |
||||
@DirtiesContext |
||||
public class ObservableReturnValueHandlerTest { |
||||
|
||||
@Value("${local.server.port}") |
||||
private int port = 0; |
||||
|
||||
private TestRestTemplate restTemplate = new TestRestTemplate(); |
||||
|
||||
@Configuration |
||||
@EnableAutoConfiguration |
||||
@RestController |
||||
protected static class Application { |
||||
|
||||
// tag::rx_observable[]
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/single") |
||||
public Single<String> single() { |
||||
return Observable.just("single value").toSingle(); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/multiple") |
||||
public Single<List<String>> multiple() { |
||||
return Observable.just("multiple", "values").toList().toSingle(); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/responseWithObservable") |
||||
public ResponseEntity<Single<String>> responseWithObservable() { |
||||
|
||||
Observable<String> observable = Observable.just("single value"); |
||||
HttpHeaders headers = new HttpHeaders(); |
||||
headers.setContentType(APPLICATION_JSON_UTF8); |
||||
return new ResponseEntity<>(observable.toSingle(), headers, HttpStatus.CREATED); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/timeout") |
||||
public Observable<String> timeout() { |
||||
return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() { |
||||
@Override |
||||
public String call(Long aLong) { |
||||
return "single value"; |
||||
} |
||||
}); |
||||
} |
||||
// end::rx_observable[]
|
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/throw") |
||||
public Single<Object> error() { |
||||
return Observable.error(new RuntimeException("Unexpected")).toSingle(); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveSingleValue() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/single"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals("single value", response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveMultipleValues() { |
||||
|
||||
// when
|
||||
ResponseEntity<List> response = restTemplate.getForEntity(path("/multiple"), List.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals(Arrays.asList("multiple", "values"), response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveSingleValueWithStatusCodeAndCustomHeader() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/responseWithObservable"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.CREATED, response.getStatusCode()); |
||||
assertEquals(MediaType.APPLICATION_JSON_UTF8, response.getHeaders().getContentType()); |
||||
assertEquals("single value", response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveErrorResponse() { |
||||
|
||||
// when
|
||||
ResponseEntity<Object> response = restTemplate.getForEntity(path("/throw"), Object.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); |
||||
} |
||||
|
||||
@Test |
||||
@Ignore("adds 30s to build") |
||||
public void shouldTimeoutOnConnection() { |
||||
|
||||
// when
|
||||
ResponseEntity<Object> response = restTemplate.getForEntity(path("/timeout"), Object.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); |
||||
} |
||||
|
||||
private String path(String context) { |
||||
return String.format("http://localhost:%d%s", port, context); |
||||
} |
||||
} |
@ -1,71 +0,0 @@
@@ -1,71 +0,0 @@
|
||||
/* |
||||
* Copyright 2013-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNotNull; |
||||
|
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
||||
import org.springframework.boot.test.IntegrationTest; |
||||
import org.springframework.boot.test.SpringApplicationConfiguration; |
||||
import org.springframework.boot.test.TestRestTemplate; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.ResponseEntity; |
||||
import org.springframework.test.annotation.DirtiesContext; |
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
||||
import org.springframework.test.context.web.WebAppConfiguration; |
||||
import org.springframework.web.bind.annotation.RequestMapping; |
||||
import org.springframework.web.bind.annotation.RequestMethod; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
|
||||
import rx.Observable; |
||||
|
||||
/** |
||||
* @author Spencer Gibb |
||||
*/ |
||||
@RunWith(SpringJUnit4ClassRunner.class) |
||||
@SpringApplicationConfiguration(classes = ObservableReturnValueHandlerTests.Application.class) |
||||
@WebAppConfiguration |
||||
@IntegrationTest({ "server.port=0" }) |
||||
@DirtiesContext |
||||
public class ObservableReturnValueHandlerTests { |
||||
|
||||
@Value("${local.server.port}") |
||||
private int port = 0; |
||||
|
||||
@Configuration |
||||
@EnableAutoConfiguration |
||||
@RestController |
||||
protected static class Application { |
||||
@RequestMapping(method = RequestMethod.GET, value = "/") |
||||
public Observable<String> hi() { |
||||
return Observable.just("hello world"); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void observableReturns() { |
||||
ResponseEntity<String> response = new TestRestTemplate().getForEntity("http://localhost:" + port, String.class); |
||||
assertNotNull("response was null", response); |
||||
assertEquals("response code was wrong", HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals("response was wrong", "hello world", response.getBody()); |
||||
} |
||||
} |
@ -0,0 +1,165 @@
@@ -0,0 +1,165 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import java.util.Date; |
||||
import java.util.GregorianCalendar; |
||||
import java.util.TimeZone; |
||||
|
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
||||
import org.springframework.boot.test.IntegrationTest; |
||||
import org.springframework.boot.test.SpringApplicationConfiguration; |
||||
import org.springframework.boot.test.TestRestTemplate; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.ResponseEntity; |
||||
import org.springframework.test.annotation.DirtiesContext; |
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
||||
import org.springframework.test.context.web.WebAppConfiguration; |
||||
import org.springframework.web.bind.annotation.RequestMapping; |
||||
import org.springframework.web.bind.annotation.RequestMethod; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator; |
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
import rx.Observable; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNotNull; |
||||
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8; |
||||
|
||||
/** |
||||
* Tests the {@link ObservableSseEmitter} class. |
||||
* |
||||
* @author Spencer Gibb |
||||
* @author Jakub Narloch |
||||
*/ |
||||
@RunWith(SpringJUnit4ClassRunner.class) |
||||
@SpringApplicationConfiguration(classes = ObservableSseEmitterTest.Application.class) |
||||
@WebAppConfiguration |
||||
@IntegrationTest({"server.port=0"}) |
||||
@DirtiesContext |
||||
public class ObservableSseEmitterTest { |
||||
|
||||
@Value("${local.server.port}") |
||||
private int port = 0; |
||||
|
||||
private TestRestTemplate restTemplate = new TestRestTemplate(); |
||||
|
||||
@Configuration |
||||
@EnableAutoConfiguration |
||||
@RestController |
||||
protected static class Application { |
||||
|
||||
// tag::rx_observable_sse[]
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/sse") |
||||
public SseEmitter single() { |
||||
return RxResponse.sse(Observable.just("single value")); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/messages") |
||||
public SseEmitter messages() { |
||||
return RxResponse.sse(Observable.just("message 1", "message 2", "message 3")); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/events") |
||||
public SseEmitter event() { |
||||
return RxResponse.sse(APPLICATION_JSON_UTF8, Observable.just( |
||||
new EventDto("Spring io", getDate(2016, 5, 19)), |
||||
new EventDto("SpringOnePlatform", getDate(2016, 8, 1)) |
||||
)); |
||||
} |
||||
// end::rx_observable_sse[]
|
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveSse() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/sse"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals("data:single value\n\n", response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveSseWithMultipleMessages() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/messages"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals("data:message 1\n\ndata:message 2\n\ndata:message 3\n\n", response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveJsonOverSseWithMultipleMessages() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/events"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals("data:{\"name\":\"Spring io\",\"date\":1466337600000}\n\ndata:{\"name\":\"SpringOnePlatform\",\"date\":1472731200000}\n\n", response.getBody()); |
||||
} |
||||
|
||||
private String path(String context) { |
||||
return String.format("http://localhost:%d%s", port, context); |
||||
} |
||||
|
||||
private static Date getDate(int year, int month, int day) { |
||||
GregorianCalendar calendar = new GregorianCalendar(year, month, day, 12, 0, 0); |
||||
calendar.setTimeZone(TimeZone.getTimeZone("UTC")); |
||||
return calendar.getTime(); |
||||
} |
||||
|
||||
/** |
||||
* A simple DTO used for testing purpose. |
||||
* |
||||
* @author Jakub Narloch |
||||
*/ |
||||
static class EventDto { |
||||
|
||||
private final String name; |
||||
|
||||
private final Date date; |
||||
|
||||
@JsonCreator |
||||
public EventDto(@JsonProperty("name") String name, @JsonProperty("date") Date date) { |
||||
this.name = name; |
||||
this.date = date; |
||||
} |
||||
|
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
public Date getDate() { |
||||
return date; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,120 @@
@@ -0,0 +1,120 @@
|
||||
/* |
||||
* Copyright 2013-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.netflix.rx; |
||||
|
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
||||
import org.springframework.boot.test.IntegrationTest; |
||||
import org.springframework.boot.test.SpringApplicationConfiguration; |
||||
import org.springframework.boot.test.TestRestTemplate; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.ResponseEntity; |
||||
import org.springframework.test.annotation.DirtiesContext; |
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
||||
import org.springframework.test.context.web.WebAppConfiguration; |
||||
import org.springframework.web.bind.annotation.RequestMapping; |
||||
import org.springframework.web.bind.annotation.RequestMethod; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
|
||||
import rx.Single; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNotNull; |
||||
|
||||
/** |
||||
* Tests the {@link SingleReturnValueHandler} class. |
||||
* |
||||
* @author Spencer Gibb |
||||
* @author Jakub Narloch |
||||
*/ |
||||
@RunWith(SpringJUnit4ClassRunner.class) |
||||
@SpringApplicationConfiguration(classes = SingleReturnValueHandlerTest.Application.class) |
||||
@WebAppConfiguration |
||||
@IntegrationTest({"server.port=0"}) |
||||
@DirtiesContext |
||||
public class SingleReturnValueHandlerTest { |
||||
|
||||
@Value("${local.server.port}") |
||||
private int port = 0; |
||||
|
||||
private TestRestTemplate restTemplate = new TestRestTemplate(); |
||||
|
||||
@Configuration |
||||
@EnableAutoConfiguration |
||||
@RestController |
||||
protected static class Application { |
||||
|
||||
// tag::rx_single[]
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/single") |
||||
public Single<String> single() { |
||||
return Single.just("single value"); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/singleWithResponse") |
||||
public ResponseEntity<Single<String>> singleWithResponse() { |
||||
return new ResponseEntity<>(Single.just("single value"), HttpStatus.NOT_FOUND); |
||||
} |
||||
|
||||
@RequestMapping(method = RequestMethod.GET, value = "/throw") |
||||
public Single<Object> error() { |
||||
return Single.error(new RuntimeException("Unexpected")); |
||||
} |
||||
// end::rx_single[]
|
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveSingleValue() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/single"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.OK, response.getStatusCode()); |
||||
assertEquals("single value", response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveSingleValueWithStatusCode() { |
||||
|
||||
// when
|
||||
ResponseEntity<String> response = restTemplate.getForEntity(path("/singleWithResponse"), String.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode()); |
||||
assertEquals("single value", response.getBody()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldRetrieveErrorResponse() { |
||||
|
||||
// when
|
||||
ResponseEntity<Object> response = restTemplate.getForEntity(path("/throw"), Object.class); |
||||
|
||||
// then
|
||||
assertNotNull(response); |
||||
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); |
||||
} |
||||
|
||||
private String path(String context) { |
||||
return String.format("http://localhost:%d%s", port, context); |
||||
} |
||||
} |
Loading…
Reference in new issue