|
|
|
@ -17,10 +17,12 @@
@@ -17,10 +17,12 @@
|
|
|
|
|
package org.springframework.cloud.netflix.hystrix; |
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
|
|
|
|
|
|
import com.netflix.hystrix.HystrixCommandGroupKey; |
|
|
|
|
import com.netflix.hystrix.HystrixCommandKey; |
|
|
|
|
import com.netflix.hystrix.HystrixObservableCommand; |
|
|
|
|
import com.netflix.hystrix.HystrixObservableCommand.Setter; |
|
|
|
|
|
|
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
@ -35,26 +37,99 @@ import rx.RxReactiveStreams;
@@ -35,26 +37,99 @@ import rx.RxReactiveStreams;
|
|
|
|
|
*/ |
|
|
|
|
public class HystrixCommands { |
|
|
|
|
|
|
|
|
|
public static <T> Flux<T> wrap(String commandName, Flux<T> flux) { |
|
|
|
|
return wrap(commandName, flux, null); |
|
|
|
|
public static <T> PublisherBuilder<T> from(Publisher<T> publisher) { |
|
|
|
|
return new PublisherBuilder<>(publisher); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static <T> Flux<T> wrap(String commandName, Flux<T> flux, Flux<T> fallback) { |
|
|
|
|
String groupName = commandName + "group"; |
|
|
|
|
PublisherHystrixCommand<T> command = createHystrixCommand(commandName, groupName, |
|
|
|
|
flux, fallback); |
|
|
|
|
return Flux.from(RxReactiveStreams.toPublisher(command.toObservable())); |
|
|
|
|
} |
|
|
|
|
public static class PublisherBuilder<T> { |
|
|
|
|
private final Publisher<T> publisher; |
|
|
|
|
private String commandName; |
|
|
|
|
private String groupName; |
|
|
|
|
private Publisher<T> fallback; |
|
|
|
|
private Setter setter; |
|
|
|
|
private boolean eager = false; |
|
|
|
|
|
|
|
|
|
public PublisherBuilder(Publisher<T> publisher) { |
|
|
|
|
this.publisher = publisher; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public PublisherBuilder<T> commandName(String commandName) { |
|
|
|
|
this.commandName = commandName; |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public PublisherBuilder<T> groupName(String groupName) { |
|
|
|
|
this.groupName = groupName; |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public PublisherBuilder<T> fallback(Publisher<T> fallback) { |
|
|
|
|
this.fallback = fallback; |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public PublisherBuilder<T> setter(Setter setter) { |
|
|
|
|
this.setter = setter; |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public PublisherBuilder<T> eager() { |
|
|
|
|
this.eager = true; |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Publisher<T> build() { |
|
|
|
|
if (!StringUtils.hasText(commandName) && setter == null) { |
|
|
|
|
throw new IllegalStateException("commandName and setter can not both be empty"); |
|
|
|
|
} |
|
|
|
|
Setter setterToUse; |
|
|
|
|
if (this.setter != null) { |
|
|
|
|
setterToUse = this.setter; |
|
|
|
|
} else { |
|
|
|
|
String groupNameToUse; |
|
|
|
|
if (StringUtils.hasText(this.groupName)) { |
|
|
|
|
groupNameToUse = this.groupName; |
|
|
|
|
} else { |
|
|
|
|
groupNameToUse = commandName + "group"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupNameToUse); |
|
|
|
|
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(this.commandName); |
|
|
|
|
setterToUse = Setter.withGroupKey(groupKey).andCommandKey(commandKey); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
PublisherHystrixCommand<T> command = new PublisherHystrixCommand<>(setterToUse, this.publisher, this.fallback); |
|
|
|
|
|
|
|
|
|
Observable<T> observable; |
|
|
|
|
if (this.eager) { |
|
|
|
|
observable = command.observe(); |
|
|
|
|
} else { |
|
|
|
|
observable = command.toObservable(); |
|
|
|
|
} |
|
|
|
|
return RxReactiveStreams.toPublisher(observable); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Flux<T> toFlux() { |
|
|
|
|
return Flux.from(build()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Mono<T> toMono() { |
|
|
|
|
return Mono.from(build()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static <T> Mono<T> wrap(String commandName, Mono<T> mono) { |
|
|
|
|
return wrap(commandName, mono, null); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static <T> Mono<T> wrap(String commandName, Mono<T> mono, Mono<T> fallback) { |
|
|
|
|
public static <T> Mono<T> inject(String commandName, Mono<T> mono, Mono<T> fallback, boolean eager) { |
|
|
|
|
String groupName = commandName + "group"; |
|
|
|
|
PublisherHystrixCommand<T> command = createHystrixCommand(commandName, groupName, |
|
|
|
|
mono, fallback); |
|
|
|
|
return Mono.from(RxReactiveStreams.toPublisher(command.toObservable())); |
|
|
|
|
Observable<T> observable; |
|
|
|
|
if (eager) { |
|
|
|
|
observable = command.observe(); |
|
|
|
|
} else { |
|
|
|
|
observable = command.toObservable(); |
|
|
|
|
} |
|
|
|
|
return Mono.from(RxReactiveStreams.toPublisher(observable)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static <T> PublisherHystrixCommand<T> createHystrixCommand(String commandName, |
|
|
|
@ -62,7 +137,7 @@ public class HystrixCommands {
@@ -62,7 +137,7 @@ public class HystrixCommands {
|
|
|
|
|
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupName); |
|
|
|
|
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandName); |
|
|
|
|
|
|
|
|
|
HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter |
|
|
|
|
Setter setter = Setter |
|
|
|
|
.withGroupKey(groupKey).andCommandKey(commandKey); |
|
|
|
|
|
|
|
|
|
return new PublisherHystrixCommand<>(setter, publisher, fallback); |
|
|
|
|