Browse Source

feat: expand the list by weights to implement weighted load balancing (#1140)

tls-versions
jizhuozhi 2 years ago committed by GitHub
parent
commit
6955fdcc7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      docs/src/main/asciidoc/spring-cloud-commons.adoc
  2. 26
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java
  3. 41
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/WeightFunction.java
  4. 141
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/WeightedServiceInstanceListSupplier.java
  5. 4
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilderTests.java
  6. 190
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/WeightedServiceInstanceListSupplierTests.java

47
docs/src/main/asciidoc/spring-cloud-commons.adoc

@ -928,6 +928,53 @@ to `false`. @@ -928,6 +928,53 @@ to `false`.
WARNING: Although the basic, non-cached, implementation is useful for prototyping and testing, it's much less efficient than the cached versions, so we recommend always using the cached version in production. If the caching is already done by the `DiscoveryClient` implementation, for example `EurekaDiscoveryClient`, the load-balancer caching should be disabled to prevent double caching.
=== Weighted Load-Balancing
To enable weighted load-balancing, we provide the `WeightedServiceInstanceListSupplier`. We use `WeightFunction` to calculate the weight of each instance.
By default, we try to read and parse the weight from the metadata map (the key is `weight`).
If the weight is not specified in the metadata map, we default the weight of this instance to be 1.
You could use this sample configuration to set it up:
[[weighted-custom-loadbalancer-configuration]]
[source,java,indent=0]
----
public class CustomLoadBalancerConfiguration {
@Bean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withDiscoveryClient()
.withWeighted()
.withCaching()
.build(context);
}
}
----
NOTE: You can also customize the weight calculation logic by providing `WeightFunction`.
You can use this sample configuration to make all instances have a random weight:
[[random-weight-weighted-custom-loadbalancer-configuration]]
[source,java,indent=0]
----
public class CustomLoadBalancerConfiguration {
@Bean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withDiscoveryClient()
.withWeighted(instance -> ThreadLocalRandom.current().nextInt(1, 101))
.withCaching()
.build(context);
}
}
----
=== Zone-Based Load-Balancing
To enable zone-based load-balancing, we provide the `ZonePreferenceServiceInstanceListSupplier`.

26
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java

@ -50,6 +50,7 @@ import org.springframework.web.util.UriComponentsBuilder; @@ -50,6 +50,7 @@ import org.springframework.web.util.UriComponentsBuilder;
* @author Olga Maciaszek-Sharma
* @author Zhiguo Chen
* @author Sabyasachi Bhattacharya
* @author Zhuozhi Ji
*/
public final class ServiceInstanceListSupplierBuilder {
@ -111,6 +112,31 @@ public final class ServiceInstanceListSupplierBuilder { @@ -111,6 +112,31 @@ public final class ServiceInstanceListSupplierBuilder {
return this;
}
/**
* Adds a {@link WeightedServiceInstanceListSupplier} to the
* {@link ServiceInstanceListSupplier} hierarchy.
* @return the {@link ServiceInstanceListSupplierBuilder} object
*/
public ServiceInstanceListSupplierBuilder withWeighted() {
DelegateCreator creator = (context, delegate) -> new WeightedServiceInstanceListSupplier(delegate);
this.creators.add(creator);
return this;
}
/**
* Adds a {@link WeightedServiceInstanceListSupplier} that uses user-provided
* {@link WeightFunction} instance to the {@link ServiceInstanceListSupplier}
* hierarchy.
* @param weightFunction a user-provided {@link WeightFunction} instance
* @return the {@link ServiceInstanceListSupplierBuilder} object
*/
public ServiceInstanceListSupplierBuilder withWeighted(WeightFunction weightFunction) {
DelegateCreator creator = (context, delegate) -> new WeightedServiceInstanceListSupplier(delegate,
weightFunction);
this.creators.add(creator);
return this;
}
/**
* Adds a {@link HealthCheckServiceInstanceListSupplier} to the
* {@link ServiceInstanceListSupplier} hierarchy.

41
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/WeightFunction.java

@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import org.springframework.cloud.client.ServiceInstance;
/**
* Represents a function that calculate the weight of the given service instance.
*
* <p>
* This is a functional interface whose functional method is
* {@link #apply(ServiceInstance)}.
*
* @author Zhuozhi Ji
* @see java.util.function.ToIntFunction
*/
@FunctionalInterface
public interface WeightFunction {
/**
* Applies this function to the given service instance.
* @param instance the service instance
* @return the weight of service instance
*/
int apply(ServiceInstance instance);
}

141
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/WeightedServiceInstanceListSupplier.java

@ -0,0 +1,141 @@ @@ -0,0 +1,141 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import org.springframework.cloud.client.ServiceInstance;
/**
* A {@link ServiceInstanceListSupplier} implementation that uses weights to expand the
* instances provided by delegate.
*
* @author Zhuozhi Ji
*/
public class WeightedServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
private static final Log LOG = LogFactory.getLog(WeightedServiceInstanceListSupplier.class);
static final String METADATA_WEIGHT_KEY = "weight";
static final int DEFAULT_WEIGHT = 1;
private final WeightFunction weightFunction;
public WeightedServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {
super(delegate);
this.weightFunction = WeightedServiceInstanceListSupplier::metadataWeightFunction;
}
public WeightedServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, WeightFunction weightFunction) {
super(delegate);
this.weightFunction = weightFunction;
}
@Override
public Flux<List<ServiceInstance>> get() {
return delegate.get().map(this::expandByWeight);
}
private List<ServiceInstance> expandByWeight(List<ServiceInstance> instances) {
if (instances.size() == 0) {
return instances;
}
int[] weights = instances.stream().mapToInt(instance -> {
try {
int weight = weightFunction.apply(instance);
if (weight <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"The weight of the instance %s should be a positive integer, but it got %d, using %d as default",
instance.getInstanceId(), weight, DEFAULT_WEIGHT));
}
return DEFAULT_WEIGHT;
}
return weight;
}
catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Exception occurred during apply weight function to instance %s, using %d as default",
instance.getInstanceId(), DEFAULT_WEIGHT), e);
}
return DEFAULT_WEIGHT;
}
}).toArray();
// Calculate the greatest common divisor (GCD) of weights and the total number of
// elements after expansion.
int gcd = 0;
int total = 0;
for (int weight : weights) {
gcd = greatestCommonDivisor(gcd, weight);
total += weight;
}
// Because scaling by the gcd does not affect the distribution,
// we can reduce memory usage by this way.
List<ServiceInstance> newInstances = new ArrayList<>(total / gcd);
// use iterator for some implementation of the List that not supports
// RandomAccess, but `weights` is supported, so use a local variable `i`
// to get the current position.
int i = 0;
for (ServiceInstance instance : instances) {
int weight = weights[i] / gcd;
for (int j = 0; j < weight; j++) {
newInstances.add(instance);
}
i++;
}
Collections.shuffle(newInstances);
return newInstances;
}
static int metadataWeightFunction(ServiceInstance serviceInstance) {
Map<String, String> metadata = serviceInstance.getMetadata();
if (metadata != null) {
String weightValue = metadata.get(METADATA_WEIGHT_KEY);
if (weightValue != null) {
return Integer.parseInt(weightValue);
}
}
// using default weight when metadata is missing or
// weight is not specified
return DEFAULT_WEIGHT;
}
static int greatestCommonDivisor(int a, int b) {
int r;
while (b != 0) {
r = a % b;
a = b;
b = r;
}
return a;
}
}

4
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilderTests.java

@ -37,9 +37,11 @@ public class ServiceInstanceListSupplierBuilderTests { @@ -37,9 +37,11 @@ public class ServiceInstanceListSupplierBuilderTests {
public void testBuilder() {
new ApplicationContextRunner().withUserConfiguration(CacheTestConfig.class).run(context -> {
ServiceInstanceListSupplier supplier = ServiceInstanceListSupplier.builder().withDiscoveryClient()
.withHealthChecks().withCaching().build(context);
.withHealthChecks().withWeighted().withCaching().build(context);
assertThat(supplier).isInstanceOf(CachingServiceInstanceListSupplier.class);
DelegatingServiceInstanceListSupplier delegating = (DelegatingServiceInstanceListSupplier) supplier;
assertThat(delegating.getDelegate()).isInstanceOf(WeightedServiceInstanceListSupplier.class);
delegating = (DelegatingServiceInstanceListSupplier) delegating.getDelegate();
assertThat(delegating.getDelegate()).isInstanceOf(HealthCheckServiceInstanceListSupplier.class);
delegating = (DelegatingServiceInstanceListSupplier) delegating.getDelegate();
assertThat(delegating.getDelegate()).isInstanceOf(DiscoveryClientServiceInstanceListSupplier.class);

190
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/WeightedServiceInstanceListSupplierTests.java

@ -0,0 +1,190 @@ @@ -0,0 +1,190 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import static java.util.stream.Collectors.summingInt;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.cloud.loadbalancer.core.WeightedServiceInstanceListSupplier.DEFAULT_WEIGHT;
/**
* Tests for {@link WeightedServiceInstanceListSupplier}.
*
* @author Zhuozhi Ji
*/
class WeightedServiceInstanceListSupplierTests {
private final DiscoveryClientServiceInstanceListSupplier delegate = mock(
DiscoveryClientServiceInstanceListSupplier.class);
@Test
void shouldReturnEmptyWhenDelegateReturnedEmpty() {
when(delegate.get()).thenReturn(Flux.just(Collections.emptyList()));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
assertThat(serviceInstances).isEmpty();
}
@Test
void shouldBeSameAsWeightsRatioWhenGcdOfWeightsIs1() {
ServiceInstance one = serviceInstance("test-1", buildWeightMetadata(1));
ServiceInstance two = serviceInstance("test-2", buildWeightMetadata(2));
ServiceInstance three = serviceInstance("test-3", buildWeightMetadata(3));
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", 1);
assertThat(counter).containsEntry("test-2", 2);
assertThat(counter).containsEntry("test-3", 3);
}
@Test
void shouldBeSameAsWeightsRatioWhenGcdOfWeightsIs10() {
ServiceInstance one = serviceInstance("test-1", buildWeightMetadata(10));
ServiceInstance two = serviceInstance("test-2", buildWeightMetadata(20));
ServiceInstance three = serviceInstance("test-3", buildWeightMetadata(30));
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", 1);
assertThat(counter).containsEntry("test-2", 2);
assertThat(counter).containsEntry("test-3", 3);
}
@Test
void shouldUseDefaultWeightWhenWeightNotSpecified() {
ServiceInstance one = serviceInstance("test-1", Collections.emptyMap());
ServiceInstance two = serviceInstance("test-2", Collections.emptyMap());
ServiceInstance three = serviceInstance("test-3", buildWeightMetadata(3));
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-2", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-3", 3);
}
@Test
void shouldUseDefaultWeightWhenWeightIsNotNumber() {
ServiceInstance one = serviceInstance("test-1", buildWeightMetadata("Foo"));
ServiceInstance two = serviceInstance("test-2", buildWeightMetadata("Bar"));
ServiceInstance three = serviceInstance("test-3", buildWeightMetadata("Baz"));
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-2", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-3", DEFAULT_WEIGHT);
}
@Test
void shouldUseDefaultWeightWhenWeightedFunctionReturnedZero() {
ServiceInstance one = serviceInstance("test-1", Collections.emptyMap());
ServiceInstance two = serviceInstance("test-2", Collections.emptyMap());
ServiceInstance three = serviceInstance("test-3", Collections.emptyMap());
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate, instance -> 0);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-2", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-3", DEFAULT_WEIGHT);
}
@Test
void shouldUseDefaultWeightWhenWeightedFunctionReturnedNegative() {
ServiceInstance one = serviceInstance("test-1", Collections.emptyMap());
ServiceInstance two = serviceInstance("test-2", Collections.emptyMap());
ServiceInstance three = serviceInstance("test-3", Collections.emptyMap());
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate,
instance -> -1);
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-2", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-3", DEFAULT_WEIGHT);
}
@Test
void shouldUseDefaultWeightWhenWeightedFunctionThrownException() {
ServiceInstance one = serviceInstance("test-1", Collections.emptyMap());
ServiceInstance two = serviceInstance("test-2", Collections.emptyMap());
ServiceInstance three = serviceInstance("test-3", Collections.emptyMap());
when(delegate.get()).thenReturn(Flux.just(Arrays.asList(one, two, three)));
WeightedServiceInstanceListSupplier supplier = new WeightedServiceInstanceListSupplier(delegate, instance -> {
throw new RuntimeException();
});
List<ServiceInstance> serviceInstances = Objects.requireNonNull(supplier.get().blockFirst());
Map<String, Integer> counter = serviceInstances.stream()
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
assertThat(counter).containsEntry("test-1", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-2", DEFAULT_WEIGHT);
assertThat(counter).containsEntry("test-3", DEFAULT_WEIGHT);
}
private ServiceInstance serviceInstance(String instanceId, Map<String, String> metadata) {
return new DefaultServiceInstance(instanceId, "test", "localhost", 8080, false, metadata);
}
private Map<String, String> buildWeightMetadata(Object weight) {
Map<String, String> metadata = new HashMap<>();
metadata.put("weight", weight.toString());
return metadata;
}
}
Loading…
Cancel
Save