diff --git a/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/hystrix/HystrixCommands.java b/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/hystrix/HystrixCommands.java index 08af3d9d..1aceeeef 100644 --- a/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/hystrix/HystrixCommands.java +++ b/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/hystrix/HystrixCommands.java @@ -17,10 +17,12 @@ package org.springframework.cloud.netflix.hystrix; import org.reactivestreams.Publisher; +import org.springframework.util.StringUtils; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixObservableCommand; +import com.netflix.hystrix.HystrixObservableCommand.Setter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -35,26 +37,99 @@ import rx.RxReactiveStreams; */ public class HystrixCommands { - public static Flux wrap(String commandName, Flux flux) { - return wrap(commandName, flux, null); + public static PublisherBuilder from(Publisher publisher) { + return new PublisherBuilder<>(publisher); } - public static Flux wrap(String commandName, Flux flux, Flux fallback) { - String groupName = commandName + "group"; - PublisherHystrixCommand command = createHystrixCommand(commandName, groupName, - flux, fallback); - return Flux.from(RxReactiveStreams.toPublisher(command.toObservable())); - } + public static class PublisherBuilder { + private final Publisher publisher; + private String commandName; + private String groupName; + private Publisher fallback; + private Setter setter; + private boolean eager = false; + + public PublisherBuilder(Publisher publisher) { + this.publisher = publisher; + } + + public PublisherBuilder commandName(String commandName) { + this.commandName = commandName; + return this; + } + + public PublisherBuilder groupName(String groupName) { + this.groupName = groupName; + return this; + } + + public PublisherBuilder fallback(Publisher fallback) { + this.fallback = fallback; + return this; + } + + public PublisherBuilder setter(Setter setter) { + this.setter = setter; + return this; + } + + public PublisherBuilder eager() { + this.eager = true; + return this; + } + + public Publisher build() { + if (!StringUtils.hasText(commandName) && setter == null) { + throw new IllegalStateException("commandName and setter can not both be empty"); + } + Setter setterToUse; + if (this.setter != null) { + setterToUse = this.setter; + } else { + String groupNameToUse; + if (StringUtils.hasText(this.groupName)) { + groupNameToUse = this.groupName; + } else { + groupNameToUse = commandName + "group"; + } + + HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupNameToUse); + HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(this.commandName); + setterToUse = Setter.withGroupKey(groupKey).andCommandKey(commandKey); + } + + PublisherHystrixCommand command = new PublisherHystrixCommand<>(setterToUse, this.publisher, this.fallback); + + Observable observable; + if (this.eager) { + observable = command.observe(); + } else { + observable = command.toObservable(); + } + return RxReactiveStreams.toPublisher(observable); + } + + public Flux toFlux() { + return Flux.from(build()); + } + + public Mono toMono() { + return Mono.from(build()); + } - public static Mono wrap(String commandName, Mono mono) { - return wrap(commandName, mono, null); } - public static Mono wrap(String commandName, Mono mono, Mono fallback) { + public static Mono inject(String commandName, Mono mono, Mono fallback, boolean eager) { String groupName = commandName + "group"; PublisherHystrixCommand command = createHystrixCommand(commandName, groupName, mono, fallback); - return Mono.from(RxReactiveStreams.toPublisher(command.toObservable())); + Observable observable; + if (eager) { + observable = command.observe(); + } else { + observable = command.toObservable(); + } + return Mono.from(RxReactiveStreams.toPublisher(observable)); } private static PublisherHystrixCommand createHystrixCommand(String commandName, @@ -62,7 +137,7 @@ public class HystrixCommands { HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupName); HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandName); - HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter + Setter setter = Setter .withGroupKey(groupKey).andCommandKey(commandKey); return new PublisherHystrixCommand<>(setter, publisher, fallback); diff --git a/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/hystrix/HystrixCommandsTests.java b/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/hystrix/HystrixCommandsTests.java index 345b7fc3..17063e99 100644 --- a/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/hystrix/HystrixCommandsTests.java +++ b/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/hystrix/HystrixCommandsTests.java @@ -36,47 +36,72 @@ public class HystrixCommandsTests { @Test public void monoWorks() { - String result = HystrixCommands.wrap("testworks", Mono.just("works")).block(); + String result = HystrixCommands.from(Mono.just("works")).commandName("testworks").toMono().block(); + assertThat(result).isEqualTo("works"); + } + + @Test + public void eagerMonoWorks() { + String result = HystrixCommands.from(Mono.just("works")) + .eager() + .commandName("testworks") + .toMono().block(); assertThat(result).isEqualTo("works"); } @Test public void monoTimesOut() { exception.expect(HystrixRuntimeException.class); - HystrixCommands.wrap("failcmd", Mono.fromCallable(() -> { + HystrixCommands.from(Mono.fromCallable(() -> { Thread.sleep(1500); return "timeout"; - })).block(); + })).commandName("failcmd").toMono().block(); } @Test public void monoFallbackWorks() { - String result = HystrixCommands.wrap("failcmd", Mono.error(new Exception()), Mono.just("fallback")).block(); + String result = HystrixCommands.from(Mono.error(new Exception())) + .commandName("failcmd") + .fallback(Mono.just("fallback")) + .toMono().block(); assertThat(result).isEqualTo("fallback"); } @Test public void fluxWorks() { - List list = HystrixCommands.wrap("multiflux", Flux.just("1", "2")).collectList().block(); + List list = HystrixCommands.from( Flux.just("1", "2")) + .commandName("multiflux") + .toFlux().collectList().block(); + assertThat(list).hasSize(2).contains("1", "2"); + } + + @Test + public void eagerFluxWorks() { + List list = HystrixCommands.from( Flux.just("1", "2")) + .commandName("multiflux") + .eager() + .toFlux().collectList().block(); assertThat(list).hasSize(2).contains("1", "2"); } @Test - // @Ignore public void fluxTimesOut() { exception.expect(HystrixRuntimeException.class); - HystrixCommands.wrap("failcmd", Flux.from(s -> { + HystrixCommands.from( Flux.from(s -> { try { Thread.sleep(1500); } catch (InterruptedException e) { throw new RuntimeException(e); } - })).blockFirst(); + })).commandName("failcmd").toFlux().blockFirst(); } @Test public void fluxFallbackWorks() { - List list = HystrixCommands.wrap("multiflux", Flux.error(new Exception()), Flux.just("a", "b")).collectList().block(); + List list = HystrixCommands.from(Flux.error(new Exception())) + .commandName("multiflux") + .fallback(Flux.just("a", "b")) + .toFlux().collectList().block(); assertThat(list).hasSize(2).contains("a", "b"); }