SpringCloud系列:Eureka源码分析(二)
2018-01-17 00:00
786 查看
摘要: 前面文章介绍了SpringCloud的具体用法,对其具体运行原理没有做过多的解释,本章开始介绍SpringCloud各个组件的源码及运行原理。首先介绍发现注册中心Eureka源码及设计原理
一、概述
前面文章我们介绍了Eureka作为服务注册中心、消费端的用法,从服务注册中心、服务提供者、服务消费者这三个主要元素来说,后两者,也就是Eureka客户端,在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处理请求的接收者。所以,我们可以从Eureka的客户端作为入口看看它是如何完成这些主动通信行为的。
二、分析
服务注册
在理解了多个服务注册中心信息的加载后,我们再回头看看
在上面的函数中,我们可以看到关键的判断依据
从源码中,我们就可以发现,“服务获取”相对于“服务续约”更为独立,“服务续约”与“服务注册”在同一个if逻辑中,这个不难理解,服务注册到Eureka Server后,自然需要一个心跳去续约,防止被剔除,所以他们肯定是成对出现的。从源码中,我们可以清楚看到了,对于服务续约相关的时间控制参数:
而“服务获取”的逻辑在独立的一个if判断中,其判断依据就是我们之前所提到的
服务注册中心处理
通过上面的源码分析,可以看到所有的交互都是通过REST的请求来发起的。下面我们来看看服务注册中心对这些请求的处理。Eureka Server对于各类REST请求的定义都位于:
以“服务注册”请求为例:
在对注册信息进行了一大堆校验之后,会调用
在注册函数中,先调用
服务端的请求接收都非常类似,对于其他的服务端处理,这里不再展开。
一、概述
前面文章我们介绍了Eureka作为服务注册中心、消费端的用法,从服务注册中心、服务提供者、服务消费者这三个主要元素来说,后两者,也就是Eureka客户端,在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处理请求的接收者。所以,我们可以从Eureka的客户端作为入口看看它是如何完成这些主动通信行为的。
二、分析
服务注册
在理解了多个服务注册中心信息的加载后,我们再回头看看
DiscoveryClient类是如何实现“服务注册”行为的,通过查看它的构造类,可以找到它调用了下面这个函数:
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
在上面的函数中,我们可以看到关键的判断依据
if (clientConfig.shouldRegisterWithEureka())。在该分支内,创建了一个
InstanceInfoReplicator类的实例,它会执行一个定时任务,查看该类的
run()函数了解该任务做了什么工作:
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
discoveryClient.register();这一行,真正触发调用注册的地方就在这里,注册操作也是通过REST请求的方式进行的。同时,这里我们也能看到发起注册请求的时候,传入了一个
com.netflix.appinfo.InstanceInfo对象,该对象就是注册时候客户端给服务端的服务的元数据。
服务获取与服务续约
顺着上面的思路,我们继续来看DiscoveryClient的
initScheduledTasks函数,不难发现在其中还有两个定时任务,分别是“服务获取”和“服务续约”:
// new method (resolve from primary servers for read) // Configure new transport layer (candidate for injecting in the future) if (clientConfig.shouldFetchRegistry()) { EurekaHttpClientFactory newQueryClientFactory = null; EurekaHttpClient newQueryClient = null; try { newQueryClientFactory = EurekaHttpClients.queryClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, clientConfig, transportConfig, applicationInfoManager.getInfo(), applicationsSource ); newQueryClient = newQueryClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; }
从源码中,我们就可以发现,“服务获取”相对于“服务续约”更为独立,“服务续约”与“服务注册”在同一个if逻辑中,这个不难理解,服务注册到Eureka Server后,自然需要一个心跳去续约,防止被剔除,所以他们肯定是成对出现的。从源码中,我们可以清楚看到了,对于服务续约相关的时间控制参数:
eureka.instance.lease-renewal-interval-in-seconds=30 eureka.instance.lease-expiration-duration-in-seconds=90
而“服务获取”的逻辑在独立的一个if判断中,其判断依据就是我们之前所提到的
eureka.client.fetch-registry=true参数,它默认是为true的,大部分情况下我们不需要关心。为了定期的更新客户端的服务清单,以保证服务访问的正确性,“服务获取”的请求不会只限于服务启动,而是一个定时执行的任务,从源码中我们可以看到任务运行中的
registryFetchIntervalSeconds参数对应
eureka.client.registry-fetch-interval-seconds=30配置参数,它默认为30秒。
服务注册中心处理
通过上面的源码分析,可以看到所有的交互都是通过REST的请求来发起的。下面我们来看看服务注册中心对这些请求的处理。Eureka Server对于各类REST请求的定义都位于:
com.netflix.eureka.resources包下。
以“服务注册”请求为例:
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields ... // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase( serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put( AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
在对注册信息进行了一大堆校验之后,会调用
org.springframework.cloud.netflix.eureka.server.InstanceRegistry对象中的
register(InstanceInfo info, int leaseDuration, boolean isReplication)函数来进行服务注册:
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) { if (log.isDebugEnabled()) { log.debug("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication); } this.ctxt.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication)); super.register(info, leaseDuration, isReplication); }
在注册函数中,先调用
publishEvent函数,将该新服务注册的事件传播出去,然后调用
com.netflix.eureka.registry.AbstractInstanceRegistry父类中的注册实现,将
InstanceInfo中的元数据信息存储在一个
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>对象中,它是一个两层Map结构,第一层的key存储服务名:
InstanceInfo中的
appName属性,第二层的key存储实例名:
InstanceInfo中的
instanceId属性。
服务端的请求接收都非常类似,对于其他的服务端处理,这里不再展开。
相关文章推荐
- Spring Cloud源码分析(一)Eureka
- Spring Cloud源码分析(一)Eureka
- spring-cloud-eureka (三) 注册中心源码分析
- Spring Cloud - Eureka Client源码分析
- spring-cloud-eureka (二) Client - Server 接口交互(消息发送)源码分析
- Spring Cloud分布式微服务云架构源码分析 — Eureka
- Spring Cloud Netflix Eureka client源码分析
- Spring Cloud源码分析(二)Ribbon
- spring-cloud系列 | eureka注册中心搭建
- Spring Cloud源码分析(二)Ribbon(续)
- spring cloud服务发现和注册源码分析
- spring cloud 入门实践系列 - eureka
- Spring原理与源码分析系列(二)- Spring IoC容器启动过程分析(上)
- Spring基础系列-AOP源码分析
- 【Spring源码分析系列】加载Bean
- Spring原理与源码分析系列(五)- Spring IoC源码分析(下)