【一起学源码-微服务】Ribbon 源码三:Ribbon与Eureka整合原理分析
2020-01-07 09:43
441 查看
前言
前情回顾
上一篇讲了Ribbon的初始化过程,从
LoadBalancerAutoConfiguration到
RibbonAutoConfiguration再到
RibbonClientConfiguration,我们找到了
ILoadBalancer默认初始化的对象等。
本讲目录
这一讲我们会进一步往下探究Ribbon和Eureka是如何结合的。
通过上一讲
ILoadBalancer我们已经可以拿到一个服务所有的服务节点信息了,这里面是怎么把服务的名称转化为对应的具体host请求信息的呢?
通过这一讲 我们来一探究竟
目录如下:
- EurekaClientAutoConfiguration.getLoadBalancer()回顾
- 再次梳理Ribbon初始化过程
- ServerList实现类初始化过程
- getUpdatedListOfServers()获取注册表列表分析
- ribbon如何更新自己保存的注册表信息?
说明
原创不易,如若转载 请标明来源!
博客地址:一枝花算不算浪漫
微信公众号:壹枝花算不算浪漫
源码阅读
EurekaClientAutoConfiguration.getLoadBalancer()回顾
上一讲我们已经深入的讲解过
getLoadBalancer()方法的实现,每个serviceName都对应一个自己的SpringContext上下文信息,然后通过
ILoadBalancer.class从上下文信息中获取默认的LoadBalancer:
ZoneAwareLoadBalancer, 我们看下这个类的构造函数:
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping, serverList, filter, serverListUpdater); }
继续跟父类
DynamicServerListLoadBalancer的初始化方法:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { volatile ServerList<T> serverListImpl; volatile ServerListFilter<T> filter; public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); } void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } }
构造方法中有个
restOfInit()方法,进去后又会有
updateListOfServers()方法,看方法名就知道这个肯定是和server注册表相关的,继续往后看,
servers = serverListImpl.getUpdatedListOfServers();,这里直接调用
getUpdatedListOfServers()就获取到了所有的注册表信息。
可以看到
ServerList有四个实现类,这个到底是该调用哪个实现类的
getUpdatedListOfServers()方法呢?接着往下看。
再次梳理Ribbon初始化过程
第二讲我们已经见过Ribbon的初始化过程,并画了图整理,这里针对于之前的图再更新一下:
这里主要是增加了
RibbonEurekaAutoConfiguration和
EurekaRibbonClientConfiguration两个配置类的初始化。
ServerList实现类初始化过程
上面已经梳理过
Ribbon初始化的过程,其中在
EurekaRibbonClientConfiguration会初始化
RibbonServerList,代码如下:
@Configuration public class EurekaRibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) { if (this.propertiesFactory.isSet(ServerList.class, serviceId)) { return this.propertiesFactory.get(ServerList.class, config, serviceId); } DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList( config, eurekaClientProvider); DomainExtractingServerList serverList = new DomainExtractingServerList( discoveryServerList, config, this.approximateZoneFromHostname); return serverList; } }
这里实际的
ServerList实际就是
DiscoveryEnabledNIWSServerList,我们看下这个类:
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{ } public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware { }
所以可以看出来
ServerList实际就是在这里进行初始化的,上面那个
serverListImpl.getUpdatedListOfServers();即为调用
DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()方法了,继续往下看。
getUpdatedListOfServers()获取注册表分析
直接看
DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()源代码:
@Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
看到这里代码就已经很明显了,我们来解读下这段代码:
- 通过eurekaClientProvider获取对应EurekaClient
- 通过vipAdress(实际就是serviceName)获取对应注册表集合信息
- 将注册信息组装成
DiscoveryEnabledServer
列表
再回到
DynamicServerListLoadBalancer.updateListOfServers()中,这里获取到对应的DiscoveryEnabledServer list后调用
updateAllServerList()方法,一路跟踪这里最终会调用
BaseLoadBalancer.setServersList()
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList = Collections .synchronizedList(new ArrayList<Server>()); public void setServersList(List lsrv) { Lock writeLock = allServerLock.writeLock(); logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name); ArrayList<Server> newServers = new ArrayList<Server>(); writeLock.lock(); try { ArrayList<Server> allServers = new ArrayList<Server>(); for (Object server : lsrv) { if (server == null) { continue; } if (server instanceof String) { server = new Server((String) server); } if (server instanceof Server) { logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId()); allServers.add((Server) server); } else { throw new IllegalArgumentException( "Type String or Server expected, instead found:" + server.getClass()); } } boolean listChanged = false; if (!allServerList.equals(allServers)) { listChanged = true; if (changeListeners != null && changeListeners.size() > 0) { List<Server> oldList = ImmutableList.copyOf(allServerList); List<Server> newList = ImmutableList.copyOf(allServers); for (ServerListChangeListener l: changeListeners) { try { l.serverListChanged(oldList, newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e); } } } } if (isEnablePrimingConnections()) { for (Server server : allServers) { if (!allServerList.contains(server)) { server.setReadyToServe(false); newServers.add((Server) server); } } if (primeConnections != null) { primeConnections.primeConnectionsAsync(newServers, this); } } // This will reset readyToServe flag to true on all servers // regardless whether // previous priming connections are success or not allServerList = allServers; if (canSkipPing()) { for (Server s : allServerList) { s.setAlive(true); } upServerList = allServerList; } else if (listChanged) { forceQuickPing(); } } finally { writeLock.unlock(); } } }
这个过程最后用一张图总结为:
ribbon如何更新自己保存的注册表信息?
上面已经讲了 Ribbon是如何通过serviceName拉取到注册表的,我们知道EurekaClient默认是30s拉取一次注册表信息的,因为Ribbon要关联注册表信息,那么Ribbon该如何更新自己存储的注册表信息呢?
继续回到
DynamicSeverListLoadBalancer.restOfInit()方法中:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected volatile ServerListUpdater serverListUpdater; void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } }
重点查看
enableAndInitLearnNewServersFeature()方法,从名字我们就可以看出来这意思为激活和初始化学习新服务的功能,这里实际上就启动
serverListUpdater中的一个线程。
在最上面Ribbon初始化的过程中我们知道,在
RibbonClientConfiguration中默认初始化的
ServerListUpdater为
PollingServreListUpdater,我们继续跟这个类的start方法:
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
这里只要是执行
updateAction.doUpdate();,然后后面启动了一个调度任务,默认30s执行一次。
继续往后跟
doUpdate()方法:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; }
这里又调用了之前通过serviceName获取对应注册服务列表的方法了。
总结到一张图如下:
总结
本文主要是重新梳理了Ribbon的初始化过程,主要是几个Configure初始化的过程,然后是Ribbon与Eureka的整合,这里也涉及到了注册表的更新逻辑。
看到这里真是被Spring的各种AutoConfigure绕晕了,哈哈,但是最后分析完 还是觉得挺清晰的,对于复杂的业务画张流程图还挺容易理解的。
申明
本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!
感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫
相关文章推荐
- 【一起学源码-微服务】Ribbon 源码二:通过Debug找出Ribbon初始化流程及ILoadBalancer原理分析
- 【一起学源码-微服务】Nexflix Eureka 源码九:服务续约源码分析
- 【一起学源码-微服务】Nexflix Eureka 源码八:EurekaClient注册表抓取 精妙设计分析!
- 【一起学源码-微服务】Nexflix Eureka 源码十二:EurekaServer集群模式源码分析
- 【一起学源码-微服务】Nexflix Eureka 源码三:EurekaServer启动之EurekaServer上下文EurekaClient创建
- 【一起学源码-微服务】Nexflix Eureka 源码十:服务下线及实例摘除,一个client下线到底多久才会被其他实例感知?
- Android源码解析之新进程中启动自定义服务过程(startService)的原理分析
- SpringCloud组件 & 源码剖析:Eureka服务注册方式流程全面分析
- 【一起学源码-微服务】Nexflix Eureka 源码六:在眼花缭乱的代码中,EurekaClient是如何注册的?
- Spring Cloud分布式微服务云架构源码分析 — Eureka
- 【springcloud】1.微服务之springcloud-》eureka源码分析之请叫我灵魂画师。。。
- 【一起学源码-微服务】Nexflix Eureka 源码十一:EurekaServer自我保护机制竟然有这么多Bug?
- 【一起学源码-微服务】Nexflix Eureka 源码五:EurekaClient启动要经历哪些艰难险阻?
- Eureka服务续约(Renew)源码分析
- 微服务之SpringCloud实战(四):SpringCloud Eureka源码分析
- Spring&WEB整合原理及源码分析
- Netflix源码解析之Ribbon:负载均衡器通过Eureka获取动态后端服务列表