Spring Cloud Netflix Eureka组件服务注册及发现源码浅析
2018-03-28 23:00
1181 查看
Spring Cloud简介
微服务这个概念已经深入人心,是最近几年的最热门技术话题之一,Spring Cloud是最流行的开源微服务框架。Spring Cloud是一系列框架的有序集合。它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,
如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用Spring Boot的开发风格做到一键启动和部署。
Spring并没有重复制造轮子,它只是将目前各家公司开发的比较成熟、经得起实际考验的服务框架组合起来,
通过Spring Boot风格进行再封装屏蔽掉了复杂的配置和实现原理,最终给开发者留出了一套简单易懂、易部署和易维护的分布式系统开发工具包。
Eureka Demo
在分析源码先搭建一个最简单的微服务实例,该Demo分为三个实例:eureka-serverEureka注册中心
service-server服务提供者
service-client服务消费者
pom.xml文件统一配置如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> <relativePath/> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>service-server</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Brixton.SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project>
Eureka注册中心
application.ymlspring: application: name: eureka server: port: 8761 management: port: 8001 eureka: instance: hostname: localhost preferIpAddress: true client: registerWithEureka: false # 本身为注册中心不用向Eureka注册服务 fetchRegistry: false # 不拉取服务实例列表 # ------------------------------------------------ # 集群模式使用 # serviceUrl: # defaultZone: http://localhost:8762/eureka/ # ------------------------------------------------ server: waitTimeInMsWhenSyncEmpty: 0 endpoints: shutdown: enabled: true #启用shutdown sensitive: false #禁用密码验证
Application.java
package cn.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @EnableEurekaServer @SpringBootApplication public class EurekaApplication { public static void main(String[] args) throws Exception { SpringApplication.run(EurekaApplication.class, args); } }
服务提供者
application.ymlspring: application: name: service-server server: port: 8003 management: port: 9003 eureka: client: serviceUrl: # 若Eureka服务注册中心为集群则用 , 隔开 defaultZone: http://localhost:8761/eureka/ instance: metadataMap: instanceId: dev server: waitTimeInMsWhenSyncEmpty: 0
Application.java
package cn.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @SpringBootApplication @EnableDiscoveryClient public class Application { public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); } }
为了节约服务资源,服务提供者若也可将eureka.client.fetchRegistry设成false,不拉取服务实例
服务消费者
application.ymlspring: application: name: service-client server: port: 8004 management: port: 9004 eureka: client: healthcheck: enabled: true serviceUrl: defaultZone: http://localhost:8761/eureka/ instance: metadataMap: instanceId: dev server: waitTimeInMsWhenSyncEmpty: 0 ribbon: ServerListRefreshInterval: 5000
Application.java
package cn.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @SpringBootApplication @EnableDiscoveryClient public class Application { public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); } }
服务启动
将以上三个服务启动先启动注册中心,其它两个实例也类似的启动
cd eureka-service mvn package cd target java -jar eureka-server-1.4.0.RELEASE
本Demo已上传至github:
https://github.com/yiliangz/spring-cloud-demo
Eureka的几个概念
Register:服务注册
Renew:服务续约,即常说的heartbeat, 默认在
Fetch Registries:获取注册列表信息
Cancel:服务下线
Eviction服务剔除
服务注册
服务实例
Eureka是一个以REST为基础的服务注册中心,
客户端的对象方法栈调用如下:
EurekaClientAutoConfiguration#eurekaClient()
CloudEurekaClient#CloudEurekaClient()
DiscoveryClient#DiscoveryClient() -> initScheduledTasks()
InstanceInfoReplicator#start() -> run()
discoveryClient#register()
AbstractJerseyEurekaHttpClient#register()
服务注册的启动首先从EurekaClientAutoConfiguration的eurekaClient方法开始
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
@Configuration @ConditionalOnRefreshScope protected static class RefreshableEurekaClientConfiguration { @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) { manager.getInfo(); // force initialization //调用CloudEurekaClient的构造方法 return new CloudEurekaClient(manager, config, this.optionalArgs,this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { //从配置文件初始化实例信息 InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } } public class CloudEurekaClient extends DiscoveryClient { // 继承自DiscoveryClient // ... public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, ApplicationContext context) { //调用DiscoveryClient的方法 this(applicationInfoManager, config, null, context); } // ... }
DiscoveryClient是Eureka的核心类, 定义了心跳检测和获取实例的调度任务
心跳检测由
eureka.client.registryFetchIntervalSeconds参数配置,默认为30s
com.netflix.discovery.DiscoveryClient
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { //定义一个任务调度线程池 scheduler = Executors.newScheduledThreadPool(3,...); //心跳检测 heartbeatExecutor = new ThreadPoolExecutor(...); // use direct handoff //获取服务实例 cacheRefreshExecutor = new ThreadPoolExecutor(...); // use direct handoff // ... //初始上面定义的三个任务调度器 initScheduledTasks(); // ... } //初始化调度任务 private void initScheduledTasks() { //... if (clientConfig.shouldRegisterWithEureka()) { if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //此处getInitialInstanceInfoReplicationIntervalSeconds默认为40s instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
scheduler的实现类为ScheduledThreadPoolExecutor, 其schedule()方法执行的调度任务只会执行一次
com.netflix.discovery.InstanceInfoReplicator
class InstanceInfoReplicator implements Runnable { public void start(int initialDelayMs) { // if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register //InstanceInfoReplicator实现了Runnale //所以此处把this传递进去就是执行此对象的run()方法 //schedule方法只会执行一次 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //调用discoveryClient的register方法 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } } }
DiscoveryClient除了注册还包括心跳和更新本地的服务实例
com.netflix.discovery.DiscoveryClient#register
boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { //此处使用了装饰器模式,实际执行register方法的是AbstractJerseyEurekaHttpClient类 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
此处为真正发起服务注册REST的地方, 由源码可以知道REST访问使用到了sun公司的jersey
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient
public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { //使用jersey客户端进行restful访问 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } }
服务注册、续约(心跳)、下线、剔除 都在AbstractJerseyEurekaHttpClient
InstanceStatus的状态枚举如下:
public enum InstanceStatus { UP, // 在用状态,可以接受服务请求 DOWN, // 不在用, 使用/healthcheck访问实例状态会失败 STARTING, // 启动中,不可用 OUT_OF_SERVICE, // 服务不可用 UNKNOWN; public static InstanceStatus toEnum(String s) { for (InstanceStatus e : InstanceStatus.values()) { if (e.name().equalsIgnoreCase(s)) { return e; } } return UNKNOWN; } }
注册的REST请求详情如下:
- 注册url: http://${eurekaIP}:8761/eureka/apps/SERVICE-SERVER
- type: POST
- 参数: instanceInfo
参数instanceInfo如下图所示(点击放大)
Eureka注册中心
上面分析的是关于客户端方面的注册动作,由上面我们可知服务实例会向Eureka注册中心发送一个rest注册请求,而Eureka注册中心接收服务实例注册的rest请求的方法在com.netflix.eureka.resources.ApplicationResource#addInstance
private final PeerAwareInstanceRegistry registry; private final ResponseCache responseCache; @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // ... //调用PeerAwareInstanceRegistryImpl的register方法 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
由于各节点之间的信息同步是用异步方式的, 所以节点之间的信息不能保证每时刻都一致
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //super(父类)为AbstractInstanceRegistry super.register(info, leaseDuration, isReplication); //将注册信息同步至其它eureka注册中心 replicateToPeers(Action.Register,info.getAppName(),info.getId(),info,null,isReplication); }
注册实例的真正存储是一个Map,这个Map的key为服务的AppName, value为该AppName的实例集合Map,
实例集合的key为注册服务的实例id, value为 Lease, Lease的概念为租期, 租期到期的话则该服务实例会被过期剔除,
续期(心跳)可配置在以下参数:
-
eureka.instance.leaseExpirationDurationInSeconds
服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除(默认90s)
-
eureka.instance.leaseRenewalIntervalInSeconds
服务刷新时间配置,每隔这个时间会主动心跳一次(默认30s)
-- com.netflix.eureka.registry.AbstractInstanceRegistry /** * 注册中心真正存储服务实例信息的是一个ConcurrentHashMap */ private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { //key为appName, 也就是以spring.application.name的大写字符 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); //如果该AppName的实例集合不存在 if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // ... gMap.put(registrant.getId(), lease); } // ... }
下面再来查阅节点同步的逻辑, 从代码分析可见此处主要是防止重复操作,
每个节点的真正同步代码在replicateInstanceActionsToPeers
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { //... // 如果是从其它节点同步过来的则不会在此再将操作信息同步至其它, 避免循环同步 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. // 由于自身节点的操作已在上面的代码中进行 ,所以如果是自身节点, 则不再作重复操作 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //将操作同步, 包括注册,续约(心跳),服务下线等都在此方法 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
服务发现
服务消费者更新服务实例
服务发现也即服务实例拉取, 由于服务发现的逻辑与服务注册类似,这里只作简单分析服务实例拉取就是一个线程调度任务, 拉取时间间隔可以在
eureka.client.registryFetchIntervalSeconds配置, 默认为30s;
com.netflix.discovery.DiscoveryClient#initScheduledTasks if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //TimedSupervisorTask继承了TimerTask, 为一个调度任务 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, //CacheRefreshThread会调用run()方法 new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } //CacheRefreshThread实际执行了fetchRegistry方法 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { if (clientConfig.shouldDisableDelta()|| …)){ //全量拉取 getAndStoreFullRegistry(); } else { //增量拉取 getAndUpdateDelta(applications); } } // ... return true; } /** * 此处仅贴出全量拉取代码, 增量拉取的代码自行参阅 */ private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse =clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); }
最后向Eureka发送一个查询服务实例的REST请求
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } }
Eureka注册中心返回服务注册实例
Eureka注册中心对服务拉取的处理, 可以看到返回服务实例不是从此前分析过的服务实例注册存储的类型为ConcurrentHashMap的registry,返回服务实例信息的是一个responseCache, 有一个定期任务会将registry的信息更新至responseCache, 该定期时间可配置在
eureka.server.responseCacheUpdateInvervalMs,
定期任务的执行在
com.netflix.eureka.registry.ResponseCacheImpl#ResponseCacheImpl
接收rest请求在
com.netflix.eureka.resources.ApplicationsResource#getContainers
private final PeerAwareInstanceRegistry registry; //实际返回服务注册实例数据的缓存 private final ResponseCache responseCache; @Inject ApplicationsResource(EurekaServerContext eurekaServer) { this.serverConfig = eurekaServer.getServerConfig(); this.registry = eurekaServer.getRegistry(); //服务缓存从是从registry里取 this.responseCache = registry.getResponseCache(); } @GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @Nullable @QueryParam("regions") String regionsStr) { Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); // ... Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { //从responseCache取返回数据 response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)).build(); } return response; }
总结
本文浅度剖析了Eureka服务注册发现的源码,可以看出服务注册和发现的核心是用任务线程池实现,任务调度器使用REST与Eureka注册中心进行通信。值得注意的是Spring Cloud Netflix组件为了性能的考虑大量使用了缓存,在实际生产上使用的话服务下线会使得消费不能立即感应导致调用了已下线的服务实例,解决此问题需清楚组件的各个任务调度和时间间隔,根据项目需要调整时间间隔。相关文章推荐
- 《Spring Cloud Netflix》 -- 服务注册和服务发现-Eureka的常用配置
- 《Spring Cloud Netflix》-- 服务注册和服务发现-Eureka的服务认证和集群
- 《Spring Cloud Netflix》--服务注册和服务发现-Eureka的深入了解
- 服务注册和服务发现-Eureka的服务认证和集群--Spring Cloud Netflix
- 《Spring Cloud Netflix》-- 服务注册和服务发现-Eureka的服务认证和集群
- Spring-cloud & Netflix 源码解析:Eureka 服务注册发现接口 ****
- Spring-cloud & Netflix 源码解析:Eureka 服务注册发现接口 ****
- 《Spring Cloud Netflix》-- 服务注册和服务发现-Eureka的服务认证和集群
- 《Spring Cloud Netflix》 -- 服务注册和服务发现-Eureka 的使用
- 《Spring Cloud Netflix官方文档》1.服务发现:Eureka客户端
- Spring Cloud Netflix Eureka: 多网卡环境下Eureka服务注册IP选择问题
- Spring Cloud Spring Boot mybatis 企业分布式微服务云(一)服务注册与发现(Eureka)【Dalston版】
- Spring Cloud Spring Boot mybatis 企业分布式微服务云(一)服务注册与发现(Eureka)【Dalston版】
- (二)SpringBoot+SpringCloud —— 使用Eureka实现服务注册与发现
- Spring Cloud Netflix Eureka: 多网卡环境下Eureka服务注册IP选择问题
- SpringCloud Eureka 服务注册与服务发现
- Spring Cloud Eureka服务注册与发现
- SpringCloud之服务注册与发现Spring Cloud Eureka实例代码
- SpringCloud 学习记录(一):服务注册与发现(eureka+feign)
- SpringCloud之注册与发现服务(Eureka)|第一章-yellowcong