Compare commits
2 Commits
main
...
wip-poc-gh
Author | SHA1 | Date |
---|---|---|
Olga Maciaszek-Sharma | 52c5b9e264 | 5 years ago |
Olga Maciaszek-Sharma | 3c310a819d | 5 years ago |
5 changed files with 216 additions and 0 deletions
@ -0,0 +1,14 @@
@@ -0,0 +1,14 @@
|
||||
package org.springframework.cloud.loadbalancer.core; |
||||
|
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
|
||||
/** |
||||
* @author Olga Maciaszek-Sharma |
||||
*/ |
||||
public interface ConnectionTrackingServiceInstance extends ServiceInstance { |
||||
|
||||
int getConnectionCount(); |
||||
|
||||
void addConnection(); |
||||
|
||||
} |
@ -0,0 +1,48 @@
@@ -0,0 +1,48 @@
|
||||
package org.springframework.cloud.loadbalancer.core; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.cloud.client.discovery.DiscoveryClient; |
||||
import org.springframework.core.env.Environment; |
||||
|
||||
import static org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory.PROPERTY_NAME; |
||||
|
||||
/** |
||||
* @author Olga Maciaszek-Sharma |
||||
*/ |
||||
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier<ConnectionTrackingServiceInstance> { |
||||
|
||||
private final DiscoveryClient delegate; |
||||
|
||||
private final String serviceId; |
||||
|
||||
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, |
||||
Environment environment) { |
||||
this.delegate = delegate; |
||||
this.serviceId = environment.getProperty(PROPERTY_NAME); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<List<ConnectionTrackingServiceInstance>> get() { |
||||
//FIXME: sensible defaults + config
|
||||
return Flux.just(getInstances()) |
||||
.delayElements(Duration.ofMinutes(5)); |
||||
} |
||||
|
||||
private List<ConnectionTrackingServiceInstance> getInstances() { |
||||
return this.delegate |
||||
.getInstances(this.serviceId) |
||||
.stream() |
||||
// switch to a more sensible conversion
|
||||
.map(serviceInstance -> (ConnectionTrackingServiceInstance) serviceInstance) |
||||
.collect(Collectors.toList()); |
||||
} |
||||
|
||||
public String getServiceId() { |
||||
return this.serviceId; |
||||
} |
||||
} |
@ -0,0 +1,73 @@
@@ -0,0 +1,73 @@
|
||||
package org.springframework.cloud.loadbalancer.core; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.core.scheduler.Schedulers; |
||||
|
||||
import org.springframework.beans.factory.ObjectProvider; |
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.Request; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.Response; |
||||
|
||||
/** |
||||
* @author Olga Maciaszek-Sharma |
||||
*/ |
||||
public class PowerOfTwoChoicesPOCLoadBalancer implements ReactorServiceInstanceLoadBalancer { |
||||
|
||||
private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class); |
||||
|
||||
private final ObjectProvider<ServiceInstanceListSupplier<ConnectionTrackingServiceInstance>> serviceInstanceListSupplier; |
||||
|
||||
private final String serviceId; |
||||
|
||||
private List<ConnectionTrackingServiceInstance> instances; |
||||
|
||||
public PowerOfTwoChoicesPOCLoadBalancer(String serviceId, |
||||
ObjectProvider<ServiceInstanceListSupplier<ConnectionTrackingServiceInstance>> serviceInstanceListSupplier) { |
||||
this.serviceId = serviceId; |
||||
this.serviceInstanceListSupplier = serviceInstanceListSupplier; |
||||
resetInstances(); |
||||
} |
||||
|
||||
// private void resetInstances() {
|
||||
// Schedulers.fromExecutorService(Executors.newSingleThreadScheduledExecutor())
|
||||
// .schedulePeriodically(() -> instances = serviceInstanceListSupplier
|
||||
// // maybe we don't have to block at all?
|
||||
// // TODO: sensible interval defaults + config
|
||||
// .getIfAvailable().get().next().block(), 0, 10, TimeUnit.MINUTES);
|
||||
// }
|
||||
|
||||
private void resetInstances() { |
||||
serviceInstanceListSupplier.getIfAvailable() |
||||
.get() |
||||
.subscribe(connectionTrackingServiceInstances -> instances = connectionTrackingServiceInstances); |
||||
} |
||||
|
||||
// TODO: optimise
|
||||
@Override |
||||
public Mono<Response<ServiceInstance>> choose(Request request) { |
||||
if (instances.isEmpty()) { |
||||
log.warn("No servers available for service: " + this.serviceId); |
||||
return Mono.just(new EmptyResponse()); |
||||
} |
||||
if (instances.size() == 1 || instances.get(0).getConnectionCount() < instances |
||||
.get(1) |
||||
.getConnectionCount()) { |
||||
instances.get(0).addConnection(); |
||||
return Mono.just(new DefaultResponse(instances.get(0))); |
||||
} |
||||
Collections.shuffle(instances); |
||||
|
||||
instances.get(1).addConnection(); |
||||
return Mono.just(new DefaultResponse(instances.get(1))); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,65 @@
@@ -0,0 +1,65 @@
|
||||
package org.springframework.cloud.loadbalancer.core; |
||||
|
||||
import java.util.Random; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.beans.factory.ObjectProvider; |
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.Request; |
||||
import org.springframework.cloud.client.loadbalancer.reactive.Response; |
||||
|
||||
/** |
||||
* @author Olga Maciaszek-Sharma |
||||
*/ |
||||
public class RoundRobinListLoadBalancer implements ReactorServiceInstanceLoadBalancer { |
||||
|
||||
private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class); |
||||
|
||||
private final AtomicInteger position; |
||||
|
||||
private final ObjectProvider<ServiceInstanceListSupplier<ServiceInstance>> serviceInstanceListSupplier; |
||||
|
||||
private final String serviceId; |
||||
|
||||
public RoundRobinListLoadBalancer(String serviceId, |
||||
ObjectProvider<ServiceInstanceListSupplier<ServiceInstance>> serviceInstanceListSupplier) { |
||||
this(serviceId, serviceInstanceListSupplier, new Random().nextInt(1000)); |
||||
} |
||||
|
||||
public RoundRobinListLoadBalancer(String serviceId, |
||||
ObjectProvider<ServiceInstanceListSupplier<ServiceInstance>> serviceInstanceListSupplier, |
||||
int seedPosition) { |
||||
this.serviceId = serviceId; |
||||
this.serviceInstanceListSupplier = serviceInstanceListSupplier; |
||||
this.position = new AtomicInteger(seedPosition); |
||||
} |
||||
|
||||
@Override |
||||
// see original
|
||||
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
|
||||
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
|
||||
public Mono<Response<ServiceInstance>> choose(Request request) { |
||||
// TODO: move supplier to Request?
|
||||
ServiceInstanceListSupplier<ServiceInstance> supplier = this.serviceInstanceListSupplier.getIfAvailable(); |
||||
return supplier.get() |
||||
.next() |
||||
.map(instances -> { |
||||
if (instances.isEmpty()) { |
||||
log.warn("No servers available for service: " + this.serviceId); |
||||
return new EmptyResponse(); |
||||
} |
||||
// TODO: enforce order?
|
||||
int pos = Math.abs(this.position.incrementAndGet()); |
||||
|
||||
ServiceInstance instance = instances.get(pos % instances.size()); |
||||
|
||||
return new DefaultResponse(instance); |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,16 @@
@@ -0,0 +1,16 @@
|
||||
package org.springframework.cloud.loadbalancer.core; |
||||
|
||||
import java.util.List; |
||||
import java.util.function.Supplier; |
||||
|
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
|
||||
/** |
||||
* @author Olga Maciaszek-Sharma |
||||
*/ |
||||
public interface ServiceInstanceListSupplier<T extends ServiceInstance> extends Supplier<Flux<List<T>>> { |
||||
|
||||
String getServiceId(); |
||||
} |
Loading…
Reference in new issue