Browse Source

Lazy fills service instances to reduce max latency (#1165)

pull/1108/merge
jizhuozhi 2 years ago committed by GitHub
parent
commit
07f7328ca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 137
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/LazyWeightedServiceInstanceList.java
  2. 40
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/WeightedServiceInstanceListSupplier.java
  3. 140
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/LazyWeightedServiceInstanceListTest.java

137
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/LazyWeightedServiceInstanceList.java

@ -0,0 +1,137 @@ @@ -0,0 +1,137 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import java.util.AbstractList;
import java.util.List;
import org.springframework.cloud.client.ServiceInstance;
/**
* A {@link List} implementation that lazily fills weighted {@link ServiceInstance}
* objects.
*
* @author Zhuozhi Ji
* @see WeightedServiceInstanceListSupplier
*/
class LazyWeightedServiceInstanceList extends AbstractList<ServiceInstance> {
private final List<ServiceInstance> instances;
private final int[] weights;
private SmoothServiceInstanceSelector selector;
private volatile int position;
/* for testing */ ServiceInstance[] expanded;
private final Object expandingLock = new Object();
LazyWeightedServiceInstanceList(List<ServiceInstance> instances, int[] weights) {
this.instances = instances;
this.weights = weights;
this.init();
}
private void init() {
// Calculate the greatest common divisor (GCD) of weights, max weight and the
// total number of elements after expansion.
int greatestCommonDivisor = 0;
int max = 0;
int total = 0;
for (int weight : weights) {
greatestCommonDivisor = greatestCommonDivisor(greatestCommonDivisor, weight);
max = Math.max(max, weight);
total += weight;
}
selector = new SmoothServiceInstanceSelector(instances, weights, max, greatestCommonDivisor);
position = 0;
expanded = new ServiceInstance[total / greatestCommonDivisor];
}
@Override
public ServiceInstance get(int index) {
if (index < position) {
return expanded[index];
}
synchronized (expandingLock) {
for (; position <= index && position < expanded.length; position++) {
expanded[position] = selector.next();
}
}
return expanded[index];
}
@Override
public int size() {
return expanded.length;
}
static int greatestCommonDivisor(int a, int b) {
int r;
while (b != 0) {
r = a % b;
a = b;
b = r;
}
return a;
}
static class SmoothServiceInstanceSelector {
final List<ServiceInstance> instances;
final int[] weights;
final int maxWeight;
final int gcd;
int position;
int currentWeight;
SmoothServiceInstanceSelector(List<ServiceInstance> instances, int[] weights, int maxWeight, int gcd) {
this.instances = instances;
this.weights = weights;
this.maxWeight = maxWeight;
this.gcd = gcd;
this.currentWeight = 0;
}
ServiceInstance next() {
// The weight of all instances is greater than 0, so it must be able to exit
// the loop.
while (true) {
for (int picked = position; picked < weights.length; picked++) {
if (weights[picked] > currentWeight) {
position = picked + 1;
return instances.get(picked);
}
}
position = 0;
currentWeight = currentWeight + gcd;
if (currentWeight >= maxWeight) {
currentWeight = 0;
}
}
}
}
}

40
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/WeightedServiceInstanceListSupplier.java

@ -16,8 +16,6 @@ @@ -16,8 +16,6 @@
package org.springframework.cloud.loadbalancer.core;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -86,33 +84,7 @@ public class WeightedServiceInstanceListSupplier extends DelegatingServiceInstan @@ -86,33 +84,7 @@ public class WeightedServiceInstanceListSupplier extends DelegatingServiceInstan
}
}).toArray();
// Calculate the greatest common divisor (GCD) of weights and the total number of
// elements after expansion.
int gcd = 0;
int total = 0;
for (int weight : weights) {
gcd = greatestCommonDivisor(gcd, weight);
total += weight;
}
// Because scaling by the gcd does not affect the distribution,
// we can reduce memory usage by this way.
List<ServiceInstance> newInstances = new ArrayList<>(total / gcd);
// use iterator for some implementation of the List that not supports
// RandomAccess, but `weights` is supported, so use a local variable `i`
// to get the current position.
int i = 0;
for (ServiceInstance instance : instances) {
int weight = weights[i] / gcd;
for (int j = 0; j < weight; j++) {
newInstances.add(instance);
}
i++;
}
Collections.shuffle(newInstances);
return newInstances;
return new LazyWeightedServiceInstanceList(instances, weights);
}
static int metadataWeightFunction(ServiceInstance serviceInstance) {
@ -128,14 +100,4 @@ public class WeightedServiceInstanceListSupplier extends DelegatingServiceInstan @@ -128,14 +100,4 @@ public class WeightedServiceInstanceListSupplier extends DelegatingServiceInstan
return DEFAULT_WEIGHT;
}
static int greatestCommonDivisor(int a, int b) {
int r;
while (b != 0) {
r = a % b;
a = b;
b = r;
}
return a;
}
}

140
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/LazyWeightedServiceInstanceListTest.java

@ -0,0 +1,140 @@ @@ -0,0 +1,140 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import static java.util.stream.Collectors.summingInt;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link LazyWeightedServiceInstanceList}.
*
* @author Zhuozhi Ji
*/
class LazyWeightedServiceInstanceListTest {
@Test
void shouldCreateListWithSizeEqualToSumofRatio() {
List<ServiceInstance> serviceInstances = new ArrayList<>();
int[] weights = new int[10];
for (int i = 0; i < 10; i++) {
int weight = (1 << i) * 100;
weights[i] = weight;
serviceInstances.add(serviceInstance("test-" + i, buildWeightMetadata(weight)));
}
int total = Arrays.stream(weights).sum() / 100;
List<ServiceInstance> list = new LazyWeightedServiceInstanceList(serviceInstances, weights);
assertThat(list).hasSize(total);
}
@Test
void shouldFillListWithAllNullElementsIfNotAccessed() {
List<ServiceInstance> serviceInstances = new ArrayList<>();
int[] weights = new int[10];
for (int i = 0; i < 10; i++) {
int weight = (1 << i) * 100;
weights[i] = weight;
serviceInstances.add(serviceInstance("test-" + i, buildWeightMetadata(weight)));
}
LazyWeightedServiceInstanceList list = new LazyWeightedServiceInstanceList(serviceInstances, weights);
for (int i = 0; i < list.size(); i++) {
assertThat(list.expanded[i]).isNull();
}
}
@Test
void shouldFillAllElementsIfGreaterPositionAccessed() {
List<ServiceInstance> serviceInstances = new ArrayList<>();
int[] weights = new int[10];
for (int i = 0; i < 10; i++) {
int weight = (1 << i) * 100;
weights[i] = weight;
serviceInstances.add(serviceInstance("test-" + i, buildWeightMetadata(weight)));
}
LazyWeightedServiceInstanceList list = new LazyWeightedServiceInstanceList(serviceInstances, weights);
list.get(list.size() - 1);
for (int i = 0; i < list.size(); i++) {
assertThat(list.expanded[i]).isNotNull();
}
}
@Test
void shouldFillAllElementsCorrectlyIfConcurrentRandomAccess() throws InterruptedException {
List<ServiceInstance> serviceInstances = new ArrayList<>();
int[] weights = new int[10];
for (int i = 0; i < 10; i++) {
int weight = 1 << i;
weights[i] = weight;
serviceInstances.add(serviceInstance("test-" + i, buildWeightMetadata(weight)));
}
Random random = new Random();
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 1, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
LazyWeightedServiceInstanceList list = new LazyWeightedServiceInstanceList(serviceInstances, weights);
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (int i = 0; i < list.size(); i++) {
int p = random.nextInt(list.size());
executor.execute(() -> {
list.get(p);
countDownLatch.countDown();
});
}
countDownLatch.await();
// make sure all instances are expanded
list.get(list.size() - 1);
Map<String, Integer> counter = Arrays.stream(list.expanded)
.collect(Collectors.groupingBy(ServiceInstance::getInstanceId, summingInt(e -> 1)));
for (int i = 0; i < 10; i++) {
assertThat(counter).containsEntry(serviceInstances.get(i).getInstanceId(), weights[i]);
}
}
private ServiceInstance serviceInstance(String instanceId, Map<String, String> metadata) {
return new DefaultServiceInstance(instanceId, "test", "localhost", 8080, false, metadata);
}
private Map<String, String> buildWeightMetadata(Object weight) {
Map<String, String> metadata = new HashMap<>();
metadata.put("weight", weight.toString());
return metadata;
}
}
Loading…
Cancel
Save