微服务发现组件Eureka Client源码解读篇
上一篇实战篇讲到Eureka具有如下功能:
获取注册表信息
服务注册
服务续约
服务下线(服务销毁)
那么它是如何实现?接下来通过源码解读的方式一一道来。
由于Eureka客户端代码较多,单独成篇,此篇主要是关于客户端源码的解读,下一篇文章讲解服务器端源码。
Eureka是由Netflix公司开发并开源的微服务组件之一,目前最新版本是2.X,网飞出于商业的考虑,该组件不再增加新功能,不会有更新的版本发布。不过,目前的版本所提供的功能已经足够生产环境的实际使用。
下图是整个Eureka Client的生命周期,分为应用启动阶段、应用执行阶段和应用销毁阶段。
先从总的看Eureka Client的功能,核心类DiscoveryClient的Javadoc是这么说的,这个类负责与Eureka Server进行交互,有服务注册、服务续约、服务下线、查询服务的功能,那么就从这个类展开,看是如何实现这些功能
[code] * The class that is instrumental for interactions with <tt>Eureka Server</tt>. * * <p> * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from * <tt>Eureka Server</tt> during shutdown * <p> * d) <em>Querying</em> the list of services/instances registered with * <tt>Eureka Server</tt> * <p>
DiscoveryClient构造函数中执行了拉取注册表信息、服务注册、服务续约等操作,先从构造函数开始看起,可以看到有备用注册中心backupRegistryProvider,这样能实现网络中断时,从备用的本地注册中心拉取数据,以免影响应用提供服务。fetchRegistryGeneration作为版本的标志,防止因为多个线程拉取注册表,出现脏读,数据过期现象,保证每次拉取的都是最新版本的数据,AtomicLong类型也是出于并发安全的考虑。然后就是调用拉取注册表、服务注册和服务需要功能,下面进入到具体的实现方法进行讲解。
[code]@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } // 获取备用注册中心 this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); // 拉取标志,用来确认是否拉取的为最新版本 fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh // 定义定时器线程池,大小为2,一个用于发送心跳,一个用于缓存刷新 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 定义心跳发送线程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 定义缓存刷新线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 初始化Eureka Client和Eureka Sever进行HTTP交互的Jersey客户端 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransp 4000 ort, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 通过方法fetchRegistry()拉取注册表信息 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 从备用注册中心拉取注册表信息 fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // 服务注册 if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch // 在初始化定时任务时完成服务续约,服务续约也是定时任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
1)拉取注册表信息功能通过调用方法DiscoveryClient#fetchRegistry(boolean forceFullRegistryFetch)实现,可以看到有两种方式拉取代码,一种是全量拉取getAndStoreFullRegistry(),一种是增量拉取getAndUpdateDelta(Applications applications),增量拉取可以节省网络带宽。接下来两种方法都跟进去看下
[code]private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); //全量拉取注册表信息 getAndStoreFullRegistry(); } else { //增量式拉取注册信息表 getAndUpdateDelta(applications); } //计算应用集合一致性哈希码 applications.setAppsHashCode(applications.getReconcileHashCode()); //打印注册表上所有服务实例的总数量 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status // 更新远程实例之前推送缓存刷新事件 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache // 基于缓存中被刷新的数据更新远程实例状态 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
全量拉取:一般在第一次拉取的时候,进行注册表信息的全量拉取,即调用方法:eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get())
[code]private void getAndStoreFullRegistry() throws Throwable { // 获取拉取的注册表的版本,防止拉取版本落后 long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : // 从Eureka Server拉取注册表信息 eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); // 获取成功 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { // apps中含有注册表里所有的服务实例信息 apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); // 检查fetchRegistryGeneration的版本是否发生改变,没有的话证明此次拉取是最新 } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 为防止同一服务的多个实例在启动时接收流量(receive traffic),筛选出状态为UP的实例,打乱其顺序 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
增量式拉取注册表信息,调用方法eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()),如果拉取失败则会全量拉取的方式拉取。此处会比较自定义的一致性哈希码,格式是${status}_${count}_,表明count个实例的状态为status,如果不一致会调用全量拉取方法获取全量数据确保Eureka Client和Eureka Server之间注册表数据一致。
[code]private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 从Eureka Server中增量式拉取注册表信息 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 获取增量拉取失败 if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); // 全量拉取注册信息 getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { // 更新本地缓存 updateDelta(delta); // 计算应用集合一致性哈希码 // appsHashCode = ${status}_${count}_,如 UP_10_DOWN_5_,10个UP实例,5个DOWN实例 reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 比较应用集合一致性哈希码,如果不一致可以认为本次增量式拉取数据已脏,用全量拉取的方式更新本地注册表信息 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
更新本地缓存DiscoveryClient#updateDelta(Application delta),根据变更类型更新本地缓存,对于类型为ADDED和MODIFIED的服务实例信息添加到本地注册表,对于类型为DELETE的服务实例信息从本地注册表中删除。
[code]private void updateDelta(Applications delta) { int deltaCount = 0; for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; // 变更类型为ADDED时,将实例信息添加到本地注册表 if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); // 添加到本地注册表中 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); // 变更类型为MODIFIED时,将实例信息添加到本地注册表 } else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); // 添加到本地注册表中 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. */ if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { // 从本地注册表中剔除 applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }
2)服务注册(DiscoveryClient#register),通过调用eurekaTransport.registrationClient.register(instanceInfo)方法,将服务实例元数据发送到Eureka Server中请求服务注册,如果返回状态码204表示注册成功。
[code]boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { // Eureka Client将服务实例元数据(instanceInfo)发送到Eureka Server中请求服务注册 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } // 返回204则表示服务注册成功 return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
3)服务续约(DiscoveryClient#initScheduledTasks()),提交发送心跳任务,任务执行方式为延时执行并且不循环,定时循环逻辑由TimedSupervisorTask提供实现
[code]private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // 缓存定时器 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { // 发送心跳定时器 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer // 心跳定时器 scheduler.schedule( // TimeSupervisorTask负责提 21f17 供定时任务的功能 new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, // HeartbeatThread实现Runnable接口,实现心跳发送任务 new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
TimeSupervisorTask#run()
[code]public void run() { Future<?> future = null; try { // 执行任务 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 等待任务执行结果 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout // 执行完成,设置下次任务执行时间间隔 delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } // 如果定时任务服务未关闭,定义下一次任务 if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
心跳任务实现线程,实现Runnable接口
[code]private class HeartbeatThread implements Runnable { public void run() { // renew()为具体实现方法 if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
具体实现方法HeartbeatThread#renew(),调用方法eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null)发送心跳到Eureka Server中维持租约
[code]boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { // 调用HTTP发送心跳到Eureka Server中维持租约 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); // 如果Eureka Server中不存在该应用实例 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); // 重新注册 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } // 续约成功 return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
4)服务下线DiscoveryClient#shutdown(),在关闭Eureka Client时注销状态监听器,关闭Jersy客户端,取消定时任务,关闭相关Monitor
[code]@PreDestroy @Override public synchronized void shutdown() { // 同步方法 if (isShutdown.compareAndSet(false, true)) { // 此处为原子操作,确保只执行一次 logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { // 注销状态监听器 applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } // 取消定时任务 cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { // 服务下线 applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { // 关闭Jersy客户端 eurekaTransport.shutdown(); } // 关闭Monitor heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } }
注销服务DiscoveryClient#unregister(),通过调用方法eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()) 向Eureka Server注销自己。
[code]void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } } }
至此,就完成对Eureka Client服务注册,拉取注册表信息,服务续约和服务下线的源码解读。现在是不是对Eureka Client的代码结构更清晰。下一篇文章对解读Eureka Server源码部分。
- 微服务发现组件Eureka Server源码解读篇
- Dubbo学习之路(五):服务消费者发现和订阅服务源码解读
- SpringCloud组件:Eureka的服务发现与消费
- Spring-Cloud-Eureka服务注册发现中心server+client案列模拟说明
- SpringCloud组件 & 源码剖析:Eureka服务注册方式流程全面分析
- Spring Cloud Netflix Eureka组件服务注册及发现源码浅析
- 深度剖析服务发现组件Netflix Eureka
- SpringCloud 服务发现组件 Eureka
- 服务发现组件SpringCloud Eureka
- SpringCloud组件之(Eureka)的注册服务与发现服务的实践
- 03.Spring Cloud学习笔记之服务注册与服务发现组件Eureka
- spring cloud服务发现组件Eureka详解
- 二、SpringCloud学习 之 服务发现组件Eureka项目创建
- 【SpringCloud Eureka源码】从Eureka Client发起注册请求到Eureka Server处理的整个服务注册过程(上)
- 深度剖析服务发现组件Eureka
- Spring Cloud 组件搭建(一)Eureka服务发现
- Spring-cloud & Netflix 源码解析:Eureka 服务注册发现接口 ****
- 一起来学Spring Cloud | 第二章:服务注册和发现组件 (Eureka)
- SpringCloud(三):服务发现组件Eureka
- SpringCloud——服务注册与发现Eureka以及注册源码解析