您的位置:首页 > 其它

Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

2017-12-14 17:37 639 查看
摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本

1. 概述

2. Eureka-Client 发起全量获取
2.1 初始化全量获取

2.2 定时获取

2.3 刷新注册信息缓存

2.4 发起获取注册信息

3. Eureka-Server 接收全量获取

3.1 接收全量获取请求

3.2 响应缓存 ResponseCache

3.3 缓存读取

3.4 主动过期读写缓存

3.5 被动过期读写缓存

3.6 定时刷新只读缓存

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程

FROM 《深度剖析服务发现组件Netflix Eureka》



Eureka-Client 获取注册信息,分成全量获取增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。

本文重点在于全量获取

推荐 Spring Cloud 书籍

请支持正版。下载盗版,等于主动编写低级 BUG

程序猿DD —— 《Spring Cloud微服务实战》

周立 —— 《Spring Cloud与Docker微服务架构实战》

两书齐买,京东包邮。

2. Eureka-Client 发起全量获取

本小节调用关系如下:



2.1 初始化全量获取

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:

// DiscoveryClient.java/* Applications 在本地的缓存*/private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,                    Provider<BackupRegistry> backupRegistryProvider) {                         // ... 省略无关代码         // 【3.2.5】初始化应用集合在本地的缓存    localRegionApps.set(new Applications());         // ... 省略无关代码              // 【3.2.12】从 Eureka-Server 拉取注册信息    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {        fetchRegistryFromBackup();    }          // ... 省略无关代码       }
com.netflix.discovery.shared.Applications
,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:



配置
eureka.shouldFetchRegistry = true
,开启从 Eureka-Server 获取注册信息。默认值:
true


调用
#fetchRegistry(false)
方法,从 Eureka-Server 全量获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

2.2 定时获取

Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:

// DiscoveryClient.javaDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,               Provider<BackupRegistry> backupRegistryProvider) {    // ... 省略无关代码                   // 【3.2.9】初始化线程池    // default size of 2 - 1 each for heartbeat and cacheRefresh    scheduler = Executors.newScheduledThreadPool(2,         new ThreadFactoryBuilder()                 .setNameFormat("DiscoveryClient-%d")                 .setDaemon(true)                 .build());        cacheRefreshExecutor = new ThreadPoolExecutor(         1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,         new SynchronousQueue<Runnable>(),         new ThreadFactoryBuilder()                 .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")                 .setDaemon(true)                 .build()     );  // use direct handoff        // ... 省略无关代码        // 【3.2.14】初始化定时任务    initScheduledTasks();        // ... 省略无关代码}private void initScheduledTasks() {    // 向 Eureka-Server 心跳(续租)执行器    if (clientConfig.shouldFetchRegistry()) {       // registry cache refresh timer       int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();       int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();       scheduler.schedule(               new TimedSupervisorTask(                       "cacheRefresh",                       scheduler,                       cacheRefreshExecutor,                       registryFetchIntervalSeconds,                       TimeUnit.SECONDS,                       expBackOffBound,                       new CacheRefreshThread()               ),               registryFetchIntervalSeconds, TimeUnit.SECONDS);     }     // ... 省略无关代码}
初始化定时任务代码,和续租的定时任务代码类似,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租
有详细解析,这里不重复分享。

com.netflix.discovery.DiscoveryClient.CacheRefreshThread
,注册信息缓存刷新任务,实现代码如下:

class CacheRefreshThread implements Runnable {    public void run() {        refreshRegistry();    }}
调用
#refreshRegistry(false)
方法,刷新注册信息缓存,在 「2.3 刷新注册信息缓存」 详细解析。

2.3 刷新注册信息缓存

调用
#refreshRegistry(false)
方法,刷新注册信息缓存,实现代码如下:

// DiscoveryClient.java  1: void refreshRegistry() {  2:     try {  3:         // TODO 芋艿:TODO[0009]:RemoteRegionRegistry  4:         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();  5:   6:         boolean remoteRegionsModified = false;  7:         // This makes sure that a dynamic change to remote regions to fetch is honored.  8:         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();  9:         if (null != latestRemoteRegions) { 10:             String currentRemoteRegions = remoteRegionsToFetch.get(); 11:             if (!latestRemoteRegions.equals(currentRemoteRegions)) { 12:                 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync 13:                 synchronized (instanceRegionChecker.getAzToRegionMapper()) { 14:                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { 15:                         String[] remoteRegions = latestRemoteRegions.split(","); 16:                         remoteRegionsRef.set(remoteRegions); 17:                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); 18:                         remoteRegionsModified = true; 19:                     } else { 20:                         logger.info("Remote regions to fetch modified concurrently," + 21:                                 " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); 22:                     } 23:                 } 24:             } else { 25:                 // Just refresh mapping to reflect any DNS/Property change 26:                 instanceRegionChecker.getAzToRegionMapper().refreshMapping(); 27:             } 28:         } 29:  30:         boolean success = fetchRegistry(remoteRegionsModified); 31:         if (success) { 32:             // 设置 注册信息的应用实例数 33:             registrySize = localRegionApps.get().size(); 34:             // 设置 最后获取注册信息时间 35:             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); 36:         } 37:  38:         // 打印日志 39:         if (logger.isDebugEnabled()) { 40:             StringBuilder allAppsHashCodes = new StringBuilder(); 41:             allAppsHashCodes.append("Local region apps hashcode: "); 42:             allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); 43:             allAppsHashCodes.append(", is fetching remote regions? "); 44:             allAppsHashCodes.append(isFetchingRemoteRegionRegistries); 45:             for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { 46:                 allAppsHashCodes.append(", Remote region: "); 47:                 allAppsHashCodes.append(entry.getKey()); 48:                 allAppsH
1fe95
ashCodes.append(" , apps hashcode: "); 49:                 allAppsHashCodes.append(entry.getValue().getAppsHashCode()); 50:             } 51:             logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", 52:                     allAppsHashCodes.toString()); 53:         } 54:     } catch (Throwable e) { 55:         logger.error("Cannot fetch registry from server", e); 56:     }         57: }
第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry

第 30 行 :调用
#fetchRegistry(false)
方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下:

/*** 注册信息的应用实例数*/private volatile int registrySize = 0;/*** 最后成功从 Eureka-Server 拉取注册信息时间戳*/private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
第 38 至 53 行 :打印调试日志。

第 54 至 56 行 :打印异常日志。

2.4 发起获取注册信息

调用
#fetchRegistry(false)
方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:

1: private boolean fetchRegistry(boolean forceFullRegistryFetch) { 2:     Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); 3:  4:     try { 5:         // 获取 本地缓存的注册的应用实例集合 6:         // If the delta is disabled or if it is the first time, get all 7:         // applications 8:         Applications applications = getApplications(); 9: 10:         // 全量获取11:         if (clientConfig.shouldDisableDelta() // 禁用增量获取12:                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))13:                 || forceFullRegistryFetch14:                 || (applications == null) // 空15:                 || (applications.getRegisteredApplications().size() == 0) // 空16:                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta17:         {18:             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());19:             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());20:             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);21:             logger.info("Application is null : {}", (applications == null));22:             logger.info("Registered Applications size is zero : {}",23:                     (applications.getRegisteredApplications().size() == 0));24:             logger.info("Application version is -1: {}", (applications.getVersion() == -1));25:             // 执行 全量获取26:             getAndStoreFullRegistry();27:         } else {28:             // 执行 增量获取29:             getAndUpdateDelta(applications);30:         }31:         // 设置 应用集合 hashcode32:         applications.setAppsHashCode(applications.getReconcileHashCode());33:         // 打印 本地缓存的注册的应用实例数量34:         logTotalInstances();35:     } catch (Throwable e) {36:         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);37:         return false;38:     } finally {39:         if (tracer != null) {40:             tracer.stop();41:         }42:     }43: 44:     // Notify about cache refresh before updating the instance remote status45:     onCacheRefreshed();46: 47:     // Update remote status based on refreshed data held in the cache48:     updateInstanceRemoteStatus();49: 50:     // registry was fetched successfully, so return true51:     return true;52: }
第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下:

public Applications getApplications() {   return localRegionApps.get();}
第 10 至 26 行 :全量获取注册信息。

第 11 行 :配置
eureka.disableDelta = true
,禁用增量获取注册信息。默认值:
false


第 12 行 :只获得一个
vipAddress
对应的应用实例们的注册信息。

第 13 行 :方法参数
forceFullRegistryFetch
强制全量获取注册信息。

第 14 至 15 行 :本地缓存为空。

第 25 至 26 行 :调用
#getAndStoreFullRegistry()
方法,全量获取注册信息,并设置到本地缓存。下文详细解析。

第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

第 31 至 32 行 :计算应用集合
hashcode
。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:

private void logTotalInstances() {   if (logger.isDebugEnabled()) {       int totInstances = 0;       for (Application application : getApplications().getRegisteredApplications()) {           totInstances += application.getInstancesAsIsFromEureka().size();       }       logger.debug("The total number of all instances in the client now is {}", totInstances);   }}
第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。

#onCacheRefreshed()
方法,实现代码如下:

/** * Eureka 事件监听器 */private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();protected void onCacheRefreshed() {    fireEvent(new CacheRefreshedEvent());}protected void fireEvent(final EurekaEvent event) {    for (EurekaEventListener listener : eventListeners) {        listener.onEvent(event);    }}
x

笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:

// 【3.2.12】从 Eureka-Server 拉取注册信息if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {    fetchRegistryFromBackup();}
第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。

1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;  2:  3: private synchronized void updateInstanceRemoteStatus() { 4:     // Determine this instance's status for this app and set to UNKNOWN if not found 5:     InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null; 6:     if (instanceInfo.getAppName() != null) { 7:         Application app = getApplication(instanceInfo.getAppName()); 8:         if (app != null) { 9:             InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());10:             if (remoteInstanceInfo != null) {11:                 currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();12:             }13:         }14:     }15:     if (currentRemoteInstanceStatus == null) {16:         currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;17:     }18: 19:     // Notify if status changed20:     if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {21:         onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);22:         lastRemoteInstanceStatus = currentRemoteInstanceStatus;23:     }24: }
第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。

第 19 至 23 行 :对比本地缓存最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存( 注意,只更新该缓存变量,不更新本地当前应用实例的状态(
instanceInfo.status
)
),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。
#onRemoteStatusChanged(...)
实现代码如下:

protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {   fireEvent(new StatusChangeEvent(oldStatus, newStatus));}
Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因,因为应用实例的覆盖状态,在 《Eureka 源码解析 —— 应用实例注册发现 (八)之覆盖状态》 有详细解析。

2.4.1 全量获取注册信息,并设置到本地缓存

调用
#getAndStoreFullRegistry()
方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:

1: private void getAndStoreFullRegistry() throws Throwable { 2:     long currentUpdateGeneration = fetchRegistryGeneration.get(); 3:  4:     logger.info("Getting all instance registry info from the eureka server"); 5:  6:     // 全量获取注册信息 7:     Applications apps = null; 8:     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null 9:             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())10:             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());11:     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {12:         apps = httpResponse.getEntity();13:     }14:     logger.info("The response status is {}", httpResponse.getStatusCode());15: 16:     // 设置到本地缓存17:     if (apps == null) {18:         logger.error("The application is null for some reason. Not storing this information");19:     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {20:         localRegionApps.set(this.filterAndShuffle(apps));21:         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());22:     } else {23:         logger.warn("Not updating applications as another thread is updating it already");24:     }25: }
第 6 至 14 行 :全量获取注册信息,实现代码如下:

// AbstractJerseyEurekaHttpClient.java@Overridepublic EurekaHttpResponse<Applications> getApplications(String... regions) {   return getApplicationsInternal("apps/", regions);}private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {   ClientResponse response = null;   String regionsParamValue = null;   try {       WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);       if (regions != null && regions.length > 0) {           regionsParamValue = StringUtil.join(regions);           webResource = webResource.queryParam("regions", regionsParamValue);       }       Builder requestBuilder = webResource.getRequestBuilder();       addExtraHeaders(requestBuilder);       response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON       Applications applications = null;       if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {           applications = response.getEntity(Applications.class);       }       return anEurekaHttpResponse(response.getStatus(), Applications.class)               .headers(headersOf(response))               .entity(applications)               .build();   } finally {       if (logger.isDebugEnabled()) {           logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",                   serviceUrl, urlPath,                   regionsParamValue == null ? "" : "regions=" + regionsParamValue,                   response == null ? "N/A" : response.getStatus()           );       }       if (response != null) {           response.close();       }   }}
调用
AbstractJerseyEurekaHttpClient#getApplications(...)
方法,GET 请求 Eureka-Server 的
apps/
接口,参数为
regions
,返回格式为 JSON ,实现全量获取注册信息

第 16 至 24 行 :设置到本地注册信息缓存

第 19 行 :TODO[0025] :并发更新的情况???

第 20 行 :调用
#filterAndShuffle(...)
方法,根据配置
eureka.shouldFilterOnlyUpInstances = true
( 默认值 :
true
) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。

3. Eureka-Server 接收全量获取

3.1 接收全量获取请求

com.netflix.eureka.resources.ApplicationsResource
,处理所有应用的请求操作的 Resource ( Controller )。

接收全量获取请求,映射
ApplicationsResource#getContainers()
方法,实现代码如下:

1: @GET 2: public Response getContainers(@PathParam("version") String version, 3:                               @HeaderParam(HEADER_ACCEPT) String acceptHeader, 4:                               @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, 5:                               @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, 6:                               @Context UriInfo uriInfo, 7:                               @Nullable @QueryParam("regions") String regionsStr) { 8:     // TODO[0009]:RemoteRegionRegistry 9:     boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();10:     String[] regions = null;11:     if (!isRemoteRegionRequested) {12:         EurekaMonitors.GET_ALL.increment();13:     } else {14:         regions = regionsStr.toLowerCase().split(",");15:         Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.16:         EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();17:     }18: 19:     // 判断是否可以访问20:     // Check if the server allows the access to the registry. The server can21:     // restrict access if it is not22:     // ready to serve traffic depending on various reasons.23:     if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {24:         return Response.status(Status.FORBIDDEN).build();25:     }26: 27:     // API 版本28:     CurrentRequestVersion.set(Version.toEnum(version));29: 30:     // 返回数据格式31:     KeyType keyType = Key.KeyType.JSON;32:     String returnMediaType = MediaType.APPLICATION_JSON;33:     if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {34:         keyType = Key.KeyType.XML;35:         returnMediaType = MediaType.APPLICATION_XML;36:     }37: 38:     // 响应缓存键( KEY )39:     Key cacheKey = new Key(Key.EntityType.Application,40:             ResponseCacheImpl.ALL_APPS,41:             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions42:     );43: 44:     //45:     Response response;46:     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {47:         response = Response.ok(responseCache.getGZIP(cacheKey))48:                 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)49:                 .header(HEADER_CONTENT_TYPE, returnMediaType)50:                 .build();51:     } else {52:         response = Response.ok(responseCache.get(cacheKey))53:                 .build();54:     }55:     return response;56: }
第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry

第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。

第 27 至 28 行 :设置 API 版本号。默认最新 API 版本为 V2。实现代码如下:

public enum Version {    V1, V2;    public static Version toEnum(String v) {        for (Version version : Version.values()) {            if (version.name().equalsIgnoreCase(v)) {                return version;            }        }        //Defaults to v2        return V2;    }}
第 30 至 36 行 :设置返回数据格式,默认 JSON 。

第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在 「3.2.1 缓存键」详细解析。

第 44 至 55 行 :从响应缓存读取全量注册信息,在 「3.3 缓存读取」详细解析。

3.2 响应缓存 ResponseCache

com.netflix.eureka.registry.ResponseCache
,响应缓存接口,接口代码如下:

public interface ResponseCache {    String get(Key key);        byte[] getGZIP(Key key);        void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);    AtomicLong getVersionDelta();        AtomicLong getVersionDeltaWithRegions();}
其中,
#getVersionDelta()
#getVersionDeltaWithRegions()
已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码:

// Applications.java@Deprecatedpublic void setVersion(Long version) {   this.versionDelta = version;}// AbstractInstanceRegistry.javapublic Applications getApplicationDeltas() {    // ... 省略其它无关代码    apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方    // ... 省略其它无关代码}
#get()
:获得缓存。

#getGZIP()
:获得缓存,并 GZIP 。

#invalidate()
:过期缓存。

3.2.1 缓存键

com.netflix.eureka.registry.Key
,缓存键。实现代码如下:

public class Key {    public enum KeyType {        JSON, XML    }    /     * An enum to define the entity that is stored in this cache for this key.     /    public enum EntityType {        Application, VIP, SVIP    }    /      实体名     /    private final String entityName;    /      TODO[0009]:RemoteRegionRegistry     /    private final String[] regions;    /      请求参数类型     /    private final KeyType requestType;    /      请求 API 版本号     /    private final Version requestVersion;    /      hashKey     /    private final String hashKey;    /      实体类型           {@link EntityType}     /    private final EntityType entityType;    /      {@link EurekaAccept}     */    private final EurekaAccept eurekaAccept;        public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {        this.regions = regions;        this.entityType = entityType;        this.entityName = entityName;        this.requestType = type;        this.requestVersion = v;        this.eurekaAccept = eurekaAccept;        hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")                + requestType.name() + requestVersion.name() + this.eurekaAccept.name();    }        public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {        this.regions = regions;        this.entityType = entityType;        this.entityName = entityName;        this.requestType = type;        this.requestVersion = v;        this.eurekaAccept = eurekaAccept;        hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")                + requestType.name() + requestVersion.name() + this.eurekaAccept.name();    }        @Override    public int hashCode() {        String hashKey = getHashKey();        return hashKey.hashCode();    }    @Override    public boolean equals(Object other) {        if (other instanceof Key) {            return getHashKey().equals(((Key) other).getHashKey());        } else {            return false;        }    }    }

3.2.2 响应缓存实现类

com.netflix.eureka.registry.ResponseCacheImpl
,响应缓存实现类。

在 ResponseCacheImpl 里,将缓存拆分成两层 :

只读缓存(
readOnlyCacheMap
)

固定过期 + 固定大小读写缓存(
readWriteCacheMap
)

默认配置下,缓存读取策略如下:



缓存过期策略如下:

应用实例注册、下线、过期时,只只只过期
readWriteCacheMap


readWriteCacheMap
写入一段时间( 可配置 )后自动过期。

定时任务对比
readWriteCacheMap
readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了
readOnlyCacheMap
的定时过期。

注意:应用实例注册、下线、过期时,不会很快刷新到
readWriteCacheMap
缓存里。默认配置下,最大延迟在 30 秒。

为什么可以使用缓存?

CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。

推荐阅读:

《为什么不应该使用ZooKeeper做服务发现》

《Spring Cloud Netflix Eureka源码导读与原理分析》「4. 作为服务注册中心,Eureka比Zookeeper好在哪里」

3.3 缓存读取

调用
ResponseCacheImpl#get(...)
方法(
#getGzip(...)
类似 ),读取缓存,实现代码如下:

1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); 2:  3: private final LoadingCache<Key, Value> readWriteCacheMap; 4:  5: public String get(final Key key) { 6:     return get(key, shouldUseReadOnlyResponseCache); 7: } 8:  9: String get(final Key key, boolean useReadOnlyCache) {10:     Value payload = getValue(key, useReadOnlyCache);11:     if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {12:         return null;13:     } else {14:         return payload.getPayload();15:     }16: }17: 18: Value getValue(final Key key, boolean useReadOnlyCache) {19:     Value payload = null;20:     try {21:         if (useReadOnlyCache) {22:             final Value currentPayload = readOnlyCacheMap.get(key);23:             if (currentPayload != null) {24:                 payload = currentPayload;25:             } else {26:                 payload = readWriteCacheMap.get(key);27:                 readOnlyCacheMap.put(key, payload);28:             }29:         } else {30:             payload = readWriteCacheMap.get(key);31:         }32:     } catch (Throwable t) {33:         logger.error("Cannot get value for key :" + key, t);34:     }35:     return payload;36: }
第 5 至 7 行 :调用
#get(key, useReadOnlyCache)
方法,读取缓存。其中
shouldUseReadOnlyResponseCache
通过配置
eureka.shouldUseReadOnlyResponseCache = true
(默认值 :
true
) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了
readOnlyCacheMap
,性能会有一定的下降。

第 9 至 16 行 :调用
getValue(key, useReadOnlyCache)
方法,读取缓存。从
readOnlyCacheMap
readWriteCacheMap
变量可以看到缓存值的类为
com.netflix.eureka.registry.ResponseCacheImpl.Value
,实现代码如下:

public class Value {   /**    * 原始值    */   private final String payload;   /**    * GZIP 压缩后的值    */   private byte[] gzipped;   public Value(String payload) {       this.payload = payload;       if (!EMPTY_PAYLOAD.equals(payload)) {           // ... 省略 GZIP 压缩代码           gzipped = bos.toByteArray();       } else {           gzipped = null;       }   }   public String getPayload() {       return payload;   }   public byte[] getGzipped() {       return gzipped;   }}
第 21 至 31 行 :读取缓存。

第 21 至 28 行 :先读取
readOnlyCacheMap
。读取不到,读取
readWriteCacheMap
,并设置到
readOnlyCacheMap


第 29 至 31 行 :读取
readWriteCacheMap


readWriteCacheMap
实现代码如下:

this.readWriteCacheMap =      CacheBuilder.newBuilder().initialCapacity(1000)              .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)              .removalListener(new RemovalListener<Key, Value>() {                  @Override                  public void onRemoval(RemovalNotification<Key, Value> notification) {                      // TODO[0009]:RemoteRegionRegistry                      Key removedKey = notification.getKey();                      if (removedKey.hasRegions()) {                          Key cloneWithNoRegions = removedKey.cloneWithoutRegions();                          regionSpecificKeys.remove(cloneWithNoRegions, removedKey);                      }                  }              })              .build(new CacheLoader<Key, Value>() {                  @Override                  public Value load(Key key) throws Exception {                      // // TODO[0009]:RemoteRegionRegistry                      if (key.hasRegions()) {                          Key cloneWithNoRegions = key.cloneWithoutRegions();                          regionSpecificKeys.put(cloneWithNoRegions, key);                      }                      Value value = generatePayload(key);                      return value;                  }              });
readWriteCacheMap
最大缓存数量为 1000 。

调用
#generatePayload(key)
方法,生成缓存值。

#generatePayload(key)
方法,实现代码如下:

1: private Value generatePayload(Key key) { 2:     Stopwatch tracer = null; 3:     try { 4:         String payload; 5:         switch (key.getEntityType()) { 6:             case Application: 7:                 boolean isRemoteRegionRequested = key.hasRegions(); 8:  9:                 if (ALL_APPS.equals(key.getName())) {10:                     if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry11:                         tracer = serializeAllAppsWithRemoteRegionTimer.start();12:                         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));13:                     } else {14:                         tracer = serializeAllAppsTimer.start();15:                         payload = getPayLoad(key, registry.getApplications());16:                     }17:                 } else if (ALL_APPS_DELTA.equals(key.getName())) {18:                     // ... 省略增量获取相关的代码19:                  } else {20:                     tracer = serializeOneApptimer.start();21:                     payload = getPayLoad(key, registry.getApplication(key.getName()));22:                 }23:                 break;24:             // ... 省略部分代码 25:         }26:         return new Value(payload);27:     } finally {28:         if (tracer != null) {29:             tracer.stop();30:         }31:     }32: }
第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry

第 13 至 16 行 :调用
AbstractInstanceRegistry#getApplications()
方法,获得注册的应用集合。后调用
#getPayLoad()
方法,将注册的应用集合转换成缓存值。�� 这两个方法代码较多,下面详细解析。

第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.1 获得注册的应用集合

调用
AbstractInstanceRegistry#getApplications()
方法,获得注册的应用集合,实现代码如下:

1: // AbstractInstanceRegistry.java 2:  3: private static final String[] EMPTY_STR_ARRAY = new String[0]; 4:  5: public Applications getApplications() { 6:    boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion(); 7:    if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry 8:        return getApplicationsFromLocalRegionOnly(); 9:    } else {10:        return getApplicationsFromAllRemoteRegions();  // Behavior of falling back to remote region can be disabled.11:    }12: }13: 14: public Applications getApplicationsFromLocalRegionOnly() {15:    return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);16: }
第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry

第 9 至 16 行 :调用
#getApplicationsFromMultipleRegions(...)
方法,获得注册的应用集合,实现代码如下:

1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) { 2:     // TODO[0009]:RemoteRegionRegistry 3:     boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0; 4:     logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}", 5:             includeRemoteRegion, Arrays.toString(remoteRegions)); 6:     if (includeRemoteRegion) { 7:         GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment(); 8:     } else { 9:         GET_ALL_CACHE_MISS.increment();10:     }11:     // 获得获得注册的应用集合12:     Applications apps = new Applications();13:     apps.setVersion(1L);14:     for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {15:         Application app = null;16: 17:         if (entry.getValue() != null) {18:             for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {19:                 Lease<InstanceInfo> lease = stringLeaseEntry.getValue();20:                 if (app == null) {21:                     app = new Application(lease.getHolder().getAppName());22:                 }23:                 app.addInstance(decorateInstanceInfo(lease));24:             }25:         }26:         if (app != null) {27:             apps.addApplication(app);28:         }29:     }30:     // TODO[0009]:RemoteRegionRegistry31:     if (includeRemoteRegion) {32:         for (String remoteRegion : remoteRegions) {33:             RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);34:             if (null != remoteRegistry) {35:                 Applications remoteApps = remoteRegistry.getApplications();36:                 for (Application application : remoteApps.getRegisteredApplications()) {37:                     if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {38:                         logger.info("Application {}  fetched from the remote region {}",39:                                 application.getName(), remoteRegion);40: 41:                         Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());42:                         if (appInstanceTillNow == null) {43:                             appInstanceTillNow = new Application(application.getName());44:                             apps.addApplication(appInstanceTillNow);45:                         }46:                         for (InstanceInfo instanceInfo : application.getInstances()) {47:                             appInstanceTillNow.addInstance(instanceInfo);48:                         }49:                     } else {50:                         logger.debug("Application {} not fetched from the remote region {} as there exists a "51:                                         + "whitelist and this app is not in the whitelist.",52:                                 application.getName(), remoteRegion);53:                     }54:                 }55:             } else {56:                 logger.warn("No remote registry available for the remote region {}", remoteRegion);57:             }58:         }59:     }60:     // 设置 应用集合 hashcode61:     apps.setAppsHashCode(apps.getReconcileHashCode());62:     return apps;63: }
第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry

第 11 至 29 行 :获得获得注册的应用集合。

第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry

第 61 行 :计算应用集合
hashcode
。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.2 转换成缓存值

调用
#getPayLoad()
方法,将注册的应用集合转换成缓存值,实现代码如下:

/* Generate pay load with both JSON and XML formats for all applications.*/private String getPayLoad(Key key, Applications apps) {   // 获得编码器   EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());   String result;   try {       // 编码       result = encoderWrapper.encode(apps);   } catch (Exception e) {       logger.error("Failed to encode the payload for all apps", e);       return "";   }   if(logger.isDebugEnabled()) {       logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());   }   return result;}

3.4 主动过期读写缓存

应用实例注册、下线、过期时,调用
ResponseCacheImpl#invalidate()
方法,主动过期读写缓存(
readWriteCacheMap
),实现代码如下:

public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {   for (Key.KeyType type : Key.KeyType.values()) {       for (Version v : Version.values()) {           invalidate(                   new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),                   new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),                   new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),                   new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),                   new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),                   new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)           );           if (null != vipAddress) {               invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));           }           if (null != secureVipAddress) {               invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));           }       }   }}
调用
#invalidate(keys)
方法,逐个过期每个缓存键值,实现代码如下:

public void invalidate(Key... keys) {   for (Key key : keys) {       logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());       // 过期读写缓存       readWriteCacheMap.invalidate(key);       // TODO[0009]:RemoteRegionRegistry       Collection<Key> keysWithRegions = regionSpecificKeys.get(key);       if (null != keysWithRegions && !keysWithRegions.isEmpty()) {           for (Key keysWithRegion : keysWithRegions) {               logger.debug("Invalidating the response cache key : {} {} {} {} {}",                       key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());               readWriteCacheMap.invalidate(keysWithRegion);           }       }   }}

3.5 被动过期读写缓存

读写缓存(
readWriteCacheMap
) 写入后,一段时间自动过期,实现代码如下:

expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
配置
eureka.responseCacheAutoExpirationInSeconds
,设置写入过期时长。默认值 :180 秒。

3.6 定时刷新只读缓存

定时任务对比
readWriteCacheMap
readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了
readOnlyCacheMap
的定时过期。实现代码如下:

1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { 2:     // ... 省略无关代码  3:  4:     long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); 5:     // ... 省略无关代码 6:  7:     if (shouldUseReadOnlyResponseCache) { 8:         timer.schedule(getCacheUpdateTask(), 9:                 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)10:                         + responseCacheUpdateIntervalMs),11:                 responseCacheUpdateIntervalMs);12:     }13: 14:     // ... 省略无关代码15: }16: 17: private TimerTask getCacheUpdateTask() {18:     return new TimerTask() {19:         @Override20:         public void run() {21:             logger.debug("Updating the client cache from response cache");22:             for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键23:                 if (logger.isDebugEnabled()) {24:                     Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};25:                     logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);26:                 }27:                 try {28:                     CurrentRequestVersion.set(key.getVersion());29:                     Value cacheValue = readWriteCacheMap.get(key);30:                     Value currentCacheValue = readOnlyCacheMap.get(key);31:                     if (cacheValue != currentCacheValue) { // 不一致时,进行替换32:                         readOnlyCacheMap.put(key, cacheValue);33:                     }34:                 } catch (Throwable th) {35:                     logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);36:                 }37:             }38:         }39:     };40: }
第 7 至 12 行 :初始化定时任务。配置
eureka.responseCacheUpdateIntervalMs
,设置任务执行频率,默认值 :30 * 1000 毫秒。

第 17 至 39 行 :创建定时任务。
第 22 行 :循环
readOnlyCacheMap
的缓存键。为什么不循环
readWriteCacheMap
readOnlyCacheMap
的缓存过期依赖
readWriteCacheMap
,因此缓存键会更多。

第 28 行 至 33 行 :对比
readWriteCacheMap
readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了
readOnlyCacheMap
的定时过期。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: