From ecaf9f29588efd037976c54142a61d30ba43ed7a Mon Sep 17 00:00:00 2001 From: Kai Date: Fri, 27 Oct 2023 14:59:01 +0800 Subject: [PATCH] =?UTF-8?q?spring=20cloud=202021.x=20=E6=BA=90=E7=A0=81?= =?UTF-8?q?=E9=98=85=E8=AF=BB,=20OpenFeign=20+=20LoadBalancer=20+=20Nacos?= =?UTF-8?q?=20Discovery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 24 ++++++------ .../named/ClientFactoryObjectProvider.java | 5 ++- .../LoadBalancerClientConfiguration.java | 25 +++++++++++- .../client/BlockingLoadBalancerClient.java | 6 ++- .../config/LoadBalancerAutoConfiguration.java | 4 +- .../CachingServiceInstanceListSupplier.java | 39 +++++++++++-------- ...veryClientServiceInstanceListSupplier.java | 22 ++++++----- .../core/RoundRobinLoadBalancer.java | 23 +++++++++-- .../ServiceInstanceListSupplierBuilder.java | 9 ++++- .../support/LoadBalancerClientFactory.java | 5 ++- 10 files changed, 115 insertions(+), 47 deletions(-) diff --git a/pom.xml b/pom.xml index b3d59a49..7b2d17fa 100644 --- a/pom.xml +++ b/pom.xml @@ -62,10 +62,10 @@ io.spring.javaformat spring-javaformat-maven-plugin - - org.apache.maven.plugins - maven-checkstyle-plugin - + + + + org.basepom.maven duplicate-finder-maven-plugin @@ -73,14 +73,14 @@ - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - + + + + + + + + diff --git a/spring-cloud-context/src/main/java/org/springframework/cloud/context/named/ClientFactoryObjectProvider.java b/spring-cloud-context/src/main/java/org/springframework/cloud/context/named/ClientFactoryObjectProvider.java index 2ce04041..b04b3902 100644 --- a/spring-cloud-context/src/main/java/org/springframework/cloud/context/named/ClientFactoryObjectProvider.java +++ b/spring-cloud-context/src/main/java/org/springframework/cloud/context/named/ClientFactoryObjectProvider.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.context.named; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.util.Iterator; import java.util.Spliterator; import java.util.function.Consumer; @@ -36,10 +36,13 @@ class ClientFactoryObjectProvider implements ObjectProvider { private final NamedContextFactory clientFactory; + // 服务的名称 private final String name; + // 固定为ServiceInstanceListSupplier private final Class type; + // 隔离provider,provider只返回clientFactory中指定name,type的数据 private ObjectProvider provider; ClientFactoryObjectProvider(NamedContextFactory clientFactory, String name, Class type) { diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java index 805c5ba3..8daf95c4 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.annotation; - +// comment-by-kai 源码阅读 spring cloud 2021.x import reactor.util.retry.RetrySpec; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -63,20 +63,28 @@ public class LoadBalancerClientConfiguration { private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465; + // 首先,在前文可以看到在 LoadBalancerAutoConfiguration 配置中,配置了 LoadBalancerClientFactory ,其实例 bean 存在于父容器中, + // 而当前配置类 LoadBalancerClientConfiguration 通过构造函数会当成默认配置类注册到 NamedContextFactory 的子容器中,每个子容器都拥有。 + // 只有明白了这一点,才能够理解容器的隔离。 @Bean - @ConditionalOnMissingBean + @ConditionalOnMissingBean // ConditionalOnMissingBean 说明可以自己实现这个bean,来覆盖默认的配置。 public ReactorLoadBalancer reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { + // name表示服务名称,比如stock-service + // LoadBalancerClientFactory bean的配置:参考代码 LoadBalancerAutoConfiguration String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinLoadBalancer( loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); } + // ============================= Reactive Retry ============================= // + @Configuration(proxyBeanMethods = false) @ConditionalOnReactiveDiscoveryEnabled @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER) public static class ReactiveSupportConfiguration { + // ReactiveDiscoveryClient + DefaultConfigurationCondition @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @@ -86,6 +94,7 @@ public class LoadBalancerClientConfiguration { return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context); } + // ReactiveDiscoveryClient + ZonePreferenceConfigurationCondition @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @@ -134,11 +143,17 @@ public class LoadBalancerClientConfiguration { } + // ============================= Blocking Retry ============================= // + @Configuration(proxyBeanMethods = false) @ConditionalOnBlockingDiscoveryEnabled @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1) public static class BlockingSupportConfiguration { + // 真正获取 ServiceInstance 列表的 由 ServiceInstanceListSupplier 提供, + // 这里可以看到可以由 DiscoveryClient 提供。这样就已经跟注册中心客户端联系上了。 + + // DiscoveryClient + DefaultConfigurationCondition @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean @@ -148,6 +163,7 @@ public class LoadBalancerClientConfiguration { return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context); } + // DiscoveryClient + ZonePreferenceConfigurationCondition @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean @@ -190,6 +206,8 @@ public class LoadBalancerClientConfiguration { } + // ============================= Blocking Retry ============================= // + @Configuration(proxyBeanMethods = false) @ConditionalOnBlockingDiscoveryEnabled @ConditionalOnClass(RetryTemplate.class) @@ -208,6 +226,8 @@ public class LoadBalancerClientConfiguration { } + // ============================= Reactive Retry ============================= // + @Configuration(proxyBeanMethods = false) @ConditionalOnReactiveDiscoveryEnabled @Conditional(ReactiveOnAvoidPreviousInstanceAndRetryEnabledCondition.class) @@ -226,6 +246,7 @@ public class LoadBalancerClientConfiguration { } + // ============================= 条件类 ============================= // static final class BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition extends AllNestedConditions { private BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition() { diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/blocking/client/BlockingLoadBalancerClient.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/blocking/client/BlockingLoadBalancerClient.java index 8cc4bdcc..2a7d3b41 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/blocking/client/BlockingLoadBalancerClient.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/blocking/client/BlockingLoadBalancerClient.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.blocking.client; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.io.IOException; import java.net.URI; import java.util.Set; @@ -168,10 +168,14 @@ public class BlockingLoadBalancerClient implements LoadBalancerClient { @Override public ServiceInstance choose(String serviceId, Request request) { + // ReactiveLoadBalancer 可以是 RoundRobinLoadBalancer,RandomLoadBalancer + // loadBalancerClientFactory继承NamedContextFactory,使用子容器实现隔离 ReactiveLoadBalancer loadBalancer = loadBalancerClientFactory.getInstance(serviceId); if (loadBalancer == null) { return null; } + + // loadBalancer.choose,返回NacosServiceInstance Response loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block(); if (loadBalancerResponse == null) { return null; diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/config/LoadBalancerAutoConfiguration.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/config/LoadBalancerAutoConfiguration.java index e3f7519d..12abb64a 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/config/LoadBalancerAutoConfiguration.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/config/LoadBalancerAutoConfiguration.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.config; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.util.Collections; import java.util.List; @@ -48,6 +48,7 @@ public class LoadBalancerAutoConfiguration { private final ObjectProvider> configurations; + // 1. Spring 的特性,对于 Configuration ,其构造函数只有一个参数的时候,是可以自动注入的 public LoadBalancerAutoConfiguration(ObjectProvider> configurations) { this.configurations = configurations; } @@ -62,6 +63,7 @@ public class LoadBalancerAutoConfiguration { @Bean public LoadBalancerClientFactory loadBalancerClientFactory(LoadBalancerClientsProperties properties) { LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory(properties); + // 【重要】这一步将配置类注入到 NamedContextFactory 中,实现个性化配置。 clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList)); return clientFactory; } diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java index 2554f766..e59085c3 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.core; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.util.List; import org.apache.commons.logging.Log; @@ -52,21 +52,28 @@ public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanc @SuppressWarnings("unchecked") public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) { super(delegate); - this.serviceInstances = CacheFlux.lookup(key -> { - // TODO: configurable cache name - Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME); - if (cache == null) { - if (log.isErrorEnabled()) { - log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME); - } - return Mono.empty(); - } - List list = cache.get(key, List.class); - if (list == null || list.isEmpty()) { - return Mono.empty(); - } - return Flux.just(list).materialize().collectList(); - }, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1)) + // lookup的第一个入参: + this.serviceInstances = CacheFlux + .lookup(key -> { + // TODO: configurable cache name + Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME); + if (cache == null) { + if (log.isErrorEnabled()) { + log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME); + } + return Mono.empty(); + } + List list = cache.get(key, List.class); + if (list == null || list.isEmpty()) { + return Mono.empty(); + } + return Flux.just(list).materialize().collectList(); + }, delegate.getServiceId() + ) + // 【重要】cache miss时,获取原始数据。 + // delegate 可以是 DiscoveryClientServiceInstanceListSupplier + .onCacheMissResume(delegate.get().take(1)) + // 同时写入缓存 .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> { Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME); if (cache == null) { diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java index e1f33998..5356a84a 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.core; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -74,14 +74,18 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) { this.serviceId = environment.getProperty(PROPERTY_NAME); resolveTimeout(environment); - this.serviceInstances = Flux - .defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> { - logTimeout(); - return Flux.just(new ArrayList<>()); - })).onErrorResume(error -> { - logException(error); - return Flux.just(new ArrayList<>()); - })); + this.serviceInstances = Flux.defer( + () -> delegate.getInstances(serviceId).collectList() + .flux() + .timeout(timeout, Flux.defer(() -> { + logTimeout(); + return Flux.just(new ArrayList<>()); + })) + .onErrorResume(error -> { + logException(error); + return Flux.just(new ArrayList<>()); + }) + ); } @Override diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java index e4c5b1ea..d1ab7e87 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java @@ -13,9 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.cloud.loadbalancer.core; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +45,11 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance final String serviceId; + /** + * 从LoadBalancerClientFactory中获取ServiceInstanceListSupplier类型的 bean + * 实际类型 ClientFactoryObjectProvider 是 getLazyProvider 内的封装类 + * @see org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory + */ ObjectProvider serviceInstanceListSupplierProvider; /** @@ -77,21 +81,33 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance // https://github.com/Netflix/ocelli/blob/master/ocelli-core/ // src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java public Mono> choose(Request request) { + // 获取supplier,调用get方法获取List + // 可以是 CachingServiceInstanceListSupplier 类型 + // 也可以是自定义的ServiceInstanceListSupplier,这取决于调用的上下文。 ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider + // 相当于getOrDefault,实际返回 CachingServiceInstanceListSupplier .getIfAvailable(NoopServiceInstanceListSupplier::new); - return supplier.get(request).next() + return supplier + // get的含义 + .get(request) + // next方法:Flux转化为Mono + .next() + // .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances)); } private Response processInstanceResponse(ServiceInstanceListSupplier supplier, List serviceInstances) { + // 选择ServiceInstance Response serviceInstanceResponse = getInstanceResponse(serviceInstances); + // 回调supplier,告知最终结果 if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) { ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer()); } return serviceInstanceResponse; } + // 轮询算法:在多个instances中选择一个 private Response getInstanceResponse(List instances) { if (instances.isEmpty()) { if (log.isWarnEnabled()) { @@ -100,6 +116,7 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance return new EmptyResponse(); } + // Ignore the sign bit, this allows pos to loop sequentially from 0 to // Integer.MAX_VALUE int pos = this.position.incrementAndGet() & Integer.MAX_VALUE; diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java index 20891426..6fc8ee83 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.core; - +// comment-by-kai 源码阅读 spring cloud 2021.x import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -55,6 +55,7 @@ public final class ServiceInstanceListSupplierBuilder { private static final Log LOG = LogFactory.getLog(ServiceInstanceListSupplierBuilder.class); + // DiscoveryClientServiceInstanceListSupplier, 分为Blocking, Reactive两种. private Creator baseCreator; private DelegateCreator cachingCreator; @@ -75,6 +76,7 @@ public final class ServiceInstanceListSupplierBuilder { LOG.warn("Overriding a previously set baseCreator with a blocking DiscoveryClient baseCreator."); } this.baseCreator = context -> { + // 构造Blocking DiscoveryClient DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class); return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment()); @@ -93,6 +95,9 @@ public final class ServiceInstanceListSupplierBuilder { LOG.warn("Overriding a previously set baseCreator with a ReactiveDiscoveryClient baseCreator."); } this.baseCreator = context -> { + // 构造Reactive DiscoveryClient + // 可以是ReactiveCompositeDiscoveryClient, 包含一个 NacosReactiveDiscoveryClient + SimpleReactiveDiscoveryClient + // 【重要】这里Nacos完成和LoadBalancer的集成 ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class); return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment()); @@ -225,6 +230,7 @@ public final class ServiceInstanceListSupplierBuilder { LOG.warn( "Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator."); } + // 启用Cache机制,使用CachingServiceInstanceListSupplier包装原始的ServiceInstanceListSupplier this.cachingCreator = (context, delegate) -> { ObjectProvider cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); @@ -276,6 +282,7 @@ public final class ServiceInstanceListSupplierBuilder { public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) { Assert.notNull(baseCreator, "A baseCreator must not be null"); + // TODO context为什么是stock-service隔离的 ? ServiceInstanceListSupplier supplier = baseCreator.apply(context); for (DelegateCreator creator : creators) { diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/support/LoadBalancerClientFactory.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/support/LoadBalancerClientFactory.java index e8b4db64..5c839cfc 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/support/LoadBalancerClientFactory.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/support/LoadBalancerClientFactory.java @@ -15,7 +15,7 @@ */ package org.springframework.cloud.loadbalancer.support; - +// comment-by-kai 源码阅读 spring cloud 2021.x import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,7 +64,10 @@ public class LoadBalancerClientFactory extends NamedContextFactory