Compare commits

...

2 Commits

Author SHA1 Message Date
Olga Maciaszek-Sharma 52c5b9e264 Switch ServiceInstanceListSupplier to use Flux. Add implementation with 5 years ago
Olga Maciaszek-Sharma 3c310a819d Draft POC. 5 years ago
  1. 14
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ConnectionTrackingServiceInstance.java
  2. 48
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java
  3. 73
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/PowerOfTwoChoicesPOCLoadBalancer.java
  4. 65
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinListLoadBalancer.java
  5. 16
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java

14
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ConnectionTrackingServiceInstance.java

@ -0,0 +1,14 @@ @@ -0,0 +1,14 @@
package org.springframework.cloud.loadbalancer.core;
import org.springframework.cloud.client.ServiceInstance;
/**
* @author Olga Maciaszek-Sharma
*/
public interface ConnectionTrackingServiceInstance extends ServiceInstance {
int getConnectionCount();
void addConnection();
}

48
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

@ -0,0 +1,48 @@ @@ -0,0 +1,48 @@
package org.springframework.cloud.loadbalancer.core;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.core.env.Environment;
import static org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory.PROPERTY_NAME;
/**
* @author Olga Maciaszek-Sharma
*/
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier<ConnectionTrackingServiceInstance> {
private final DiscoveryClient delegate;
private final String serviceId;
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate,
Environment environment) {
this.delegate = delegate;
this.serviceId = environment.getProperty(PROPERTY_NAME);
}
@Override
public Flux<List<ConnectionTrackingServiceInstance>> get() {
//FIXME: sensible defaults + config
return Flux.just(getInstances())
.delayElements(Duration.ofMinutes(5));
}
private List<ConnectionTrackingServiceInstance> getInstances() {
return this.delegate
.getInstances(this.serviceId)
.stream()
// switch to a more sensible conversion
.map(serviceInstance -> (ConnectionTrackingServiceInstance) serviceInstance)
.collect(Collectors.toList());
}
public String getServiceId() {
return this.serviceId;
}
}

73
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/PowerOfTwoChoicesPOCLoadBalancer.java

@ -0,0 +1,73 @@ @@ -0,0 +1,73 @@
package org.springframework.cloud.loadbalancer.core;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
/**
* @author Olga Maciaszek-Sharma
*/
public class PowerOfTwoChoicesPOCLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);
private final ObjectProvider<ServiceInstanceListSupplier<ConnectionTrackingServiceInstance>> serviceInstanceListSupplier;
private final String serviceId;
private List<ConnectionTrackingServiceInstance> instances;
public PowerOfTwoChoicesPOCLoadBalancer(String serviceId,
ObjectProvider<ServiceInstanceListSupplier<ConnectionTrackingServiceInstance>> serviceInstanceListSupplier) {
this.serviceId = serviceId;
this.serviceInstanceListSupplier = serviceInstanceListSupplier;
resetInstances();
}
// private void resetInstances() {
// Schedulers.fromExecutorService(Executors.newSingleThreadScheduledExecutor())
// .schedulePeriodically(() -> instances = serviceInstanceListSupplier
// // maybe we don't have to block at all?
// // TODO: sensible interval defaults + config
// .getIfAvailable().get().next().block(), 0, 10, TimeUnit.MINUTES);
// }
private void resetInstances() {
serviceInstanceListSupplier.getIfAvailable()
.get()
.subscribe(connectionTrackingServiceInstances -> instances = connectionTrackingServiceInstances);
}
// TODO: optimise
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
if (instances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return Mono.just(new EmptyResponse());
}
if (instances.size() == 1 || instances.get(0).getConnectionCount() < instances
.get(1)
.getConnectionCount()) {
instances.get(0).addConnection();
return Mono.just(new DefaultResponse(instances.get(0)));
}
Collections.shuffle(instances);
instances.get(1).addConnection();
return Mono.just(new DefaultResponse(instances.get(1)));
}
}

65
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinListLoadBalancer.java

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
package org.springframework.cloud.loadbalancer.core;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
/**
* @author Olga Maciaszek-Sharma
*/
public class RoundRobinListLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);
private final AtomicInteger position;
private final ObjectProvider<ServiceInstanceListSupplier<ServiceInstance>> serviceInstanceListSupplier;
private final String serviceId;
public RoundRobinListLoadBalancer(String serviceId,
ObjectProvider<ServiceInstanceListSupplier<ServiceInstance>> serviceInstanceListSupplier) {
this(serviceId, serviceInstanceListSupplier, new Random().nextInt(1000));
}
public RoundRobinListLoadBalancer(String serviceId,
ObjectProvider<ServiceInstanceListSupplier<ServiceInstance>> serviceInstanceListSupplier,
int seedPosition) {
this.serviceId = serviceId;
this.serviceInstanceListSupplier = serviceInstanceListSupplier;
this.position = new AtomicInteger(seedPosition);
}
@Override
// see original
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono<Response<ServiceInstance>> choose(Request request) {
// TODO: move supplier to Request?
ServiceInstanceListSupplier<ServiceInstance> supplier = this.serviceInstanceListSupplier.getIfAvailable();
return supplier.get()
.next()
.map(instances -> {
if (instances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
// TODO: enforce order?
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
});
}
}

16
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
package org.springframework.cloud.loadbalancer.core;
import java.util.List;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import org.springframework.cloud.client.ServiceInstance;
/**
* @author Olga Maciaszek-Sharma
*/
public interface ServiceInstanceListSupplier<T extends ServiceInstance> extends Supplier<Flux<List<T>>> {
String getServiceId();
}
Loading…
Cancel
Save