Browse Source

Add support for `CompletableFuture` for method return types (#638)

Implements support for `CompletableFuture` on method return types by converting through RxJava `Observable`
pull/855/head
Will May 6 years ago committed by Marvin Froeder
parent
commit
9dfd9b4329
  1. 10
      hystrix/README.md
  2. 4
      hystrix/src/main/java/feign/hystrix/HystrixDelegatingContract.java
  3. 17
      hystrix/src/main/java/feign/hystrix/HystrixInvocationHandler.java
  4. 35
      hystrix/src/main/java/feign/hystrix/ObservableCompletableFuture.java
  5. 72
      hystrix/src/test/java/feign/hystrix/HystrixBuilderTest.java

10
hystrix/README.md

@ -10,11 +10,11 @@ GitHub github = HystrixFeign.builder() @@ -10,11 +10,11 @@ GitHub github = HystrixFeign.builder()
.target(GitHub.class, "https://api.github.com");
```
For asynchronous or reactive use, return `HystrixCommand<YourType>`.
For asynchronous or reactive use, return `HystrixCommand<YourType>` or `CompletableFuture<YourType>`.
For RxJava compatibility, use `rx.Observable<YourType>` or `rx.Single<YourType>`. Rx types are <a href="http://reactivex.io/documentation/observable.html">cold</a>, which means a http call isn't made until there's a subscriber.
Methods that do *not* return [`HystrixCommand`](https://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html), [`rx.Observable`](http://reactivex.io/RxJava/javadoc/rx/Observable.html) or [`rx.Single`] are still wrapped in a `HystrixCommand`, but `execute()` is automatically called for you.
Methods that do *not* return [`HystrixCommand`](https://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html), `CompletableFuture`, [`rx.Observable`](http://reactivex.io/RxJava/javadoc/rx/Observable.html) or `rx.Single` are still wrapped in a `HystrixCommand`, but `execute()` is automatically called for you.
```java
interface YourApi {
@ -27,6 +27,9 @@ interface YourApi { @@ -27,6 +27,9 @@ interface YourApi {
@RequestLine("GET /yourtype/{id}")
Single<YourType> getYourTypeSingle(@Param("id") String id);
@RequestLine("GET /yourtype/{id}")
CompletableFuture<YourType> getYourTypeCompletableFuture(@Param("id") String id);
@RequestLine("GET /yourtype/{id}")
YourType getYourTypeSynchronous(@Param("id") String id);
}
@ -46,6 +49,9 @@ api.getYourType("a").queue(); @@ -46,6 +49,9 @@ api.getYourType("a").queue();
// for synchronous
api.getYourType("a").execute();
// or for a CompletableFuture
api.getYourTypeCompletableFuture("a").thenApply(o -> "b");
// or to apply hystrix to existing feign methods.
api.getYourTypeSynchronous("a");
```

4
hystrix/src/main/java/feign/hystrix/HystrixDelegatingContract.java

@ -17,6 +17,7 @@ import static feign.Util.resolveLastTypeParameter; @@ -17,6 +17,7 @@ import static feign.Util.resolveLastTypeParameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.netflix.hystrix.HystrixCommand;
import feign.Contract;
import feign.MethodMetadata;
@ -63,6 +64,9 @@ public final class HystrixDelegatingContract implements Contract { @@ -63,6 +64,9 @@ public final class HystrixDelegatingContract implements Contract {
} else if (type instanceof ParameterizedType
&& ((ParameterizedType) type).getRawType().equals(Completable.class)) {
metadata.returnType(void.class);
} else if (type instanceof ParameterizedType
&& ((ParameterizedType) type).getRawType().equals(CompletableFuture.class)) {
metadata.returnType(resolveLastTypeParameter(type, CompletableFuture.class));
}
}

17
hystrix/src/main/java/feign/hystrix/HystrixInvocationHandler.java

@ -22,6 +22,9 @@ import java.lang.reflect.Proxy; @@ -22,6 +22,9 @@ import java.lang.reflect.Proxy;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import feign.InvocationHandlerFactory.MethodHandler;
import feign.Target;
import feign.Util;
@ -130,15 +133,21 @@ final class HystrixInvocationHandler implements InvocationHandler { @@ -130,15 +133,21 @@ final class HystrixInvocationHandler implements InvocationHandler {
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException e) {
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
@ -155,6 +164,8 @@ final class HystrixInvocationHandler implements InvocationHandler { @@ -155,6 +164,8 @@ final class HystrixInvocationHandler implements InvocationHandler {
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
@ -171,6 +182,10 @@ final class HystrixInvocationHandler implements InvocationHandler { @@ -171,6 +182,10 @@ final class HystrixInvocationHandler implements InvocationHandler {
return Observable.class.isAssignableFrom(method.getReturnType());
}
private boolean isReturnsCompletableFuture(Method method) {
return CompletableFuture.class.isAssignableFrom(method.getReturnType());
}
private boolean isReturnsSingle(Method method) {
return Single.class.isAssignableFrom(method.getReturnType());
}

35
hystrix/src/main/java/feign/hystrix/ObservableCompletableFuture.java

@ -0,0 +1,35 @@ @@ -0,0 +1,35 @@
/**
* Copyright 2012-2018 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package feign.hystrix;
import com.netflix.hystrix.HystrixCommand;
import rx.Subscription;
import java.util.concurrent.CompletableFuture;
final class ObservableCompletableFuture<T> extends CompletableFuture<T> {
private final Subscription sub;
ObservableCompletableFuture(final HystrixCommand<T> command) {
this.sub = command.toObservable().single().subscribe(ObservableCompletableFuture.this::complete,
ObservableCompletableFuture.this::completeExceptionally);
}
@Override
public boolean cancel(final boolean b) {
sub.unsubscribe();
return super.cancel(b);
}
}

72
hystrix/src/test/java/feign/hystrix/HystrixBuilderTest.java

@ -26,6 +26,10 @@ import java.util.ArrayList; @@ -26,6 +26,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import feign.FeignException;
import feign.Headers;
import feign.Param;
@ -427,6 +431,66 @@ public class HystrixBuilderTest { @@ -427,6 +431,66 @@ public class HystrixBuilderTest {
assertThat(testSubscriber.getOnNextEvents().get(0)).containsExactly("fallback");
}
@Test
public void completableFutureEmptyBody()
throws InterruptedException, ExecutionException, TimeoutException {
server.enqueue(new MockResponse());
TestInterface api = target();
CompletableFuture<String> completable = api.completableFuture();
assertThat(completable).isNotNull();
completable.get(5, TimeUnit.SECONDS);
}
@Test
public void completableFutureWithBody()
throws InterruptedException, ExecutionException, TimeoutException {
server.enqueue(new MockResponse().setBody("foo"));
TestInterface api = target();
CompletableFuture<String> completable = api.completableFuture();
assertThat(completable).isNotNull();
assertThat(completable.get(5, TimeUnit.SECONDS)).isEqualTo("foo");
}
@Test
public void completableFutureFailWithoutFallback() throws TimeoutException, InterruptedException {
server.enqueue(new MockResponse().setResponseCode(500));
TestInterface api = HystrixFeign.builder()
.target(TestInterface.class, "http://localhost:" + server.getPort());
CompletableFuture<String> completable = api.completableFuture();
assertThat(completable).isNotNull();
try {
completable.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertThat(e).hasCauseInstanceOf(HystrixRuntimeException.class);
}
}
@Test
public void completableFutureFallback()
throws InterruptedException, ExecutionException, TimeoutException {
server.enqueue(new MockResponse().setResponseCode(500));
TestInterface api = target();
CompletableFuture<String> completable = api.completableFuture();
assertThat(completable).isNotNull();
assertThat(completable.get(5, TimeUnit.SECONDS)).isEqualTo("fallback");
}
@Test
public void rxCompletableEmptyBody() {
server.enqueue(new MockResponse());
@ -657,6 +721,9 @@ public class HystrixBuilderTest { @@ -657,6 +721,9 @@ public class HystrixBuilderTest {
@RequestLine("GET /")
Completable completable();
@RequestLine("GET /")
CompletableFuture<String> completableFuture();
}
class FallbackTestInterface implements TestInterface {
@ -742,5 +809,10 @@ public class HystrixBuilderTest { @@ -742,5 +809,10 @@ public class HystrixBuilderTest {
public Completable completable() {
return Completable.complete();
}
@Override
public CompletableFuture<String> completableFuture() {
return CompletableFuture.completedFuture("fallback");
}
}
}

Loading…
Cancel
Save