Browse Source

spring cloud 2021.x 源码阅读, OpenFeign + LoadBalancer + Nacos Discovery

hekai-study-v2021.x
Kai 11 months ago
parent
commit
ecaf9f2958
  1. 24
      pom.xml
  2. 5
      spring-cloud-context/src/main/java/org/springframework/cloud/context/named/ClientFactoryObjectProvider.java
  3. 25
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java
  4. 6
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/blocking/client/BlockingLoadBalancerClient.java
  5. 4
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/config/LoadBalancerAutoConfiguration.java
  6. 39
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java
  7. 22
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java
  8. 23
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java
  9. 9
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java
  10. 5
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/support/LoadBalancerClientFactory.java

24
pom.xml

@ -62,10 +62,10 @@ @@ -62,10 +62,10 @@
<groupId>io.spring.javaformat</groupId>
<artifactId>spring-javaformat-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-checkstyle-plugin</artifactId>-->
<!-- </plugin>-->
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
@ -73,14 +73,14 @@ @@ -73,14 +73,14 @@
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</reporting>
<!-- <reporting>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-checkstyle-plugin</artifactId>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </reporting>-->
<profiles>
<profile>

5
spring-cloud-context/src/main/java/org/springframework/cloud/context/named/ClientFactoryObjectProvider.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.context.named;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;
@ -36,10 +36,13 @@ class ClientFactoryObjectProvider<T> implements ObjectProvider<T> { @@ -36,10 +36,13 @@ class ClientFactoryObjectProvider<T> implements ObjectProvider<T> {
private final NamedContextFactory<?> clientFactory;
// 服务的名称
private final String name;
// 固定为ServiceInstanceListSupplier
private final Class<T> type;
// 隔离provider,provider只返回clientFactory中指定name,type的数据
private ObjectProvider<T> provider;
ClientFactoryObjectProvider(NamedContextFactory<?> clientFactory, String name, Class<T> type) {

25
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.annotation;
// comment-by-kai 源码阅读 spring cloud 2021.x
import reactor.util.retry.RetrySpec;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
@ -63,20 +63,28 @@ public class LoadBalancerClientConfiguration { @@ -63,20 +63,28 @@ public class LoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
// 首先,在前文可以看到在 LoadBalancerAutoConfiguration 配置中,配置了 LoadBalancerClientFactory ,其实例 bean 存在于父容器中,
// 而当前配置类 LoadBalancerClientConfiguration 通过构造函数会当成默认配置类注册到 NamedContextFactory 的子容器中,每个子容器都拥有。
// 只有明白了这一点,才能够理解容器的隔离。
@Bean
@ConditionalOnMissingBean
@ConditionalOnMissingBean // ConditionalOnMissingBean 说明可以自己实现这个bean,来覆盖默认的配置。
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
// name表示服务名称,比如stock-service
// LoadBalancerClientFactory bean的配置:参考代码 LoadBalancerAutoConfiguration
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
// ============================= Reactive Retry ============================= //
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
public static class ReactiveSupportConfiguration {
// ReactiveDiscoveryClient + DefaultConfigurationCondition
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ -86,6 +94,7 @@ public class LoadBalancerClientConfiguration { @@ -86,6 +94,7 @@ public class LoadBalancerClientConfiguration {
return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
}
// ReactiveDiscoveryClient + ZonePreferenceConfigurationCondition
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ -134,11 +143,17 @@ public class LoadBalancerClientConfiguration { @@ -134,11 +143,17 @@ public class LoadBalancerClientConfiguration {
}
// ============================= Blocking Retry ============================= //
@Configuration(proxyBeanMethods = false)
@ConditionalOnBlockingDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1)
public static class BlockingSupportConfiguration {
// 真正获取 ServiceInstance 列表的 由 ServiceInstanceListSupplier 提供,
// 这里可以看到可以由 DiscoveryClient 提供。这样就已经跟注册中心客户端联系上了。
// DiscoveryClient + DefaultConfigurationCondition
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@ -148,6 +163,7 @@ public class LoadBalancerClientConfiguration { @@ -148,6 +163,7 @@ public class LoadBalancerClientConfiguration {
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
}
// DiscoveryClient + ZonePreferenceConfigurationCondition
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@ -190,6 +206,8 @@ public class LoadBalancerClientConfiguration { @@ -190,6 +206,8 @@ public class LoadBalancerClientConfiguration {
}
// ============================= Blocking Retry ============================= //
@Configuration(proxyBeanMethods = false)
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnClass(RetryTemplate.class)
@ -208,6 +226,8 @@ public class LoadBalancerClientConfiguration { @@ -208,6 +226,8 @@ public class LoadBalancerClientConfiguration {
}
// ============================= Reactive Retry ============================= //
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Conditional(ReactiveOnAvoidPreviousInstanceAndRetryEnabledCondition.class)
@ -226,6 +246,7 @@ public class LoadBalancerClientConfiguration { @@ -226,6 +246,7 @@ public class LoadBalancerClientConfiguration {
}
// ============================= 条件类 ============================= //
static final class BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition extends AllNestedConditions {
private BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition() {

6
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/blocking/client/BlockingLoadBalancerClient.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.blocking.client;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.io.IOException;
import java.net.URI;
import java.util.Set;
@ -168,10 +168,14 @@ public class BlockingLoadBalancerClient implements LoadBalancerClient { @@ -168,10 +168,14 @@ public class BlockingLoadBalancerClient implements LoadBalancerClient {
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
// ReactiveLoadBalancer 可以是 RoundRobinLoadBalancer,RandomLoadBalancer
// loadBalancerClientFactory继承NamedContextFactory,使用子容器实现隔离
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
// loadBalancer.choose,返回NacosServiceInstance
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;

4
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/config/LoadBalancerAutoConfiguration.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.config;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.util.Collections;
import java.util.List;
@ -48,6 +48,7 @@ public class LoadBalancerAutoConfiguration { @@ -48,6 +48,7 @@ public class LoadBalancerAutoConfiguration {
private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;
// 1. Spring 的特性,对于 Configuration ,其构造函数只有一个参数的时候,是可以自动注入的
public LoadBalancerAutoConfiguration(ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
this.configurations = configurations;
}
@ -62,6 +63,7 @@ public class LoadBalancerAutoConfiguration { @@ -62,6 +63,7 @@ public class LoadBalancerAutoConfiguration {
@Bean
public LoadBalancerClientFactory loadBalancerClientFactory(LoadBalancerClientsProperties properties) {
LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory(properties);
// 【重要】这一步将配置类注入到 NamedContextFactory 中,实现个性化配置。
clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
return clientFactory;
}

39
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.core;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.util.List;
import org.apache.commons.logging.Log;
@ -52,21 +52,28 @@ public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanc @@ -52,21 +52,28 @@ public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanc
@SuppressWarnings("unchecked")
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
super(delegate);
this.serviceInstances = CacheFlux.lookup(key -> {
// TODO: configurable cache name
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
if (log.isErrorEnabled()) {
log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
}
return Mono.empty();
}
List<ServiceInstance> list = cache.get(key, List.class);
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
}, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
// lookup的第一个入参:
this.serviceInstances = CacheFlux
.lookup(key -> {
// TODO: configurable cache name
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
if (log.isErrorEnabled()) {
log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
}
return Mono.empty();
}
List<ServiceInstance> list = cache.get(key, List.class);
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
}, delegate.getServiceId()
)
// 【重要】cache miss时,获取原始数据。
// delegate 可以是 DiscoveryClientServiceInstanceListSupplier
.onCacheMissResume(delegate.get().take(1))
// 同时写入缓存
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {

22
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.core;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@ -74,14 +74,18 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan @@ -74,14 +74,18 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
resolveTimeout(environment);
this.serviceInstances = Flux
.defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
}));
this.serviceInstances = Flux.defer(
() -> delegate.getInstances(serviceId).collectList()
.flux()
.timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
}))
.onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
})
);
}
@Override

23
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinLoadBalancer.java

@ -13,9 +13,8 @@ @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@ -46,6 +45,11 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance @@ -46,6 +45,11 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance
final String serviceId;
/**
* 从LoadBalancerClientFactory中获取ServiceInstanceListSupplier类型的 bean
* 实际类型 ClientFactoryObjectProvider getLazyProvider 内的封装类
* @see org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory
*/
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
/**
@ -77,21 +81,33 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance @@ -77,21 +81,33 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono<Response<ServiceInstance>> choose(Request request) {
// 获取supplier,调用get方法获取List<ServiceInstance>
// 可以是 CachingServiceInstanceListSupplier 类型
// 也可以是自定义的ServiceInstanceListSupplier,这取决于调用的上下文。
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
// 相当于getOrDefault,实际返回 CachingServiceInstanceListSupplier
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
return supplier
// get的含义
.get(request)
// next方法:Flux转化为Mono
.next()
//
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
// 选择ServiceInstance
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
// 回调supplier,告知最终结果
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
// 轮询算法:在多个instances中选择一个
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
@ -100,6 +116,7 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance @@ -100,6 +116,7 @@ public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalance
return new EmptyResponse();
}
// Ignore the sign bit, this allows pos to loop sequentially from 0 to
// Integer.MAX_VALUE
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

9
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.core;
// comment-by-kai 源码阅读 spring cloud 2021.x
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@ -55,6 +55,7 @@ public final class ServiceInstanceListSupplierBuilder { @@ -55,6 +55,7 @@ public final class ServiceInstanceListSupplierBuilder {
private static final Log LOG = LogFactory.getLog(ServiceInstanceListSupplierBuilder.class);
// DiscoveryClientServiceInstanceListSupplier, 分为Blocking, Reactive两种.
private Creator baseCreator;
private DelegateCreator cachingCreator;
@ -75,6 +76,7 @@ public final class ServiceInstanceListSupplierBuilder { @@ -75,6 +76,7 @@ public final class ServiceInstanceListSupplierBuilder {
LOG.warn("Overriding a previously set baseCreator with a blocking DiscoveryClient baseCreator.");
}
this.baseCreator = context -> {
// 构造Blocking DiscoveryClient
DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
@ -93,6 +95,9 @@ public final class ServiceInstanceListSupplierBuilder { @@ -93,6 +95,9 @@ public final class ServiceInstanceListSupplierBuilder {
LOG.warn("Overriding a previously set baseCreator with a ReactiveDiscoveryClient baseCreator.");
}
this.baseCreator = context -> {
// 构造Reactive DiscoveryClient
// 可以是ReactiveCompositeDiscoveryClient, 包含一个 NacosReactiveDiscoveryClient + SimpleReactiveDiscoveryClient
// 【重要】这里Nacos完成和LoadBalancer的集成
ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
@ -225,6 +230,7 @@ public final class ServiceInstanceListSupplierBuilder { @@ -225,6 +230,7 @@ public final class ServiceInstanceListSupplierBuilder {
LOG.warn(
"Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator.");
}
// 启用Cache机制,使用CachingServiceInstanceListSupplier包装原始的ServiceInstanceListSupplier
this.cachingCreator = (context, delegate) -> {
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
@ -276,6 +282,7 @@ public final class ServiceInstanceListSupplierBuilder { @@ -276,6 +282,7 @@ public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
Assert.notNull(baseCreator, "A baseCreator must not be null");
// TODO context为什么是stock-service隔离的 ?
ServiceInstanceListSupplier supplier = baseCreator.apply(context);
for (DelegateCreator creator : creators) {

5
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/support/LoadBalancerClientFactory.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package org.springframework.cloud.loadbalancer.support;
// comment-by-kai 源码阅读 spring cloud 2021.x
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -64,7 +64,10 @@ public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerC @@ -64,7 +64,10 @@ public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerC
this(null);
}
// 构造器调用:org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration
public LoadBalancerClientFactory(LoadBalancerClientsProperties properties) {
// 这里注意看 LoadBalancerClientConfiguration 被当成了默认配置类注入到 NamedContextFactory 中。也就是说每个子容器都会有这个配置类。
// 根据构造器 NamedContextFactory 会构造一个 PropertySource,key=loadbalancer.client.name,value = ?
super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
this.properties = properties;
}

Loading…
Cancel
Save