您的位置:首页 > 编程语言 > Java开发

Spring Cloud Netflix Eureka组件服务注册及发现源码浅析

2018-03-28 23:00 1181 查看

Spring Cloud简介

微服务这个概念已经深入人心,是最近几年的最热门技术话题之一,Spring Cloud是最流行的开源微服务框架。

Spring Cloud是一系列框架的有序集合。它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,

如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用Spring Boot的开发风格做到一键启动和部署。

Spring并没有重复制造轮子,它只是将目前各家公司开发的比较成熟、经得起实际考验的服务框架组合起来,

通过Spring Boot风格进行再封装屏蔽掉了复杂的配置和实现原理,最终给开发者留出了一套简单易懂、易部署和易维护的分布式系统开发工具包。

Eureka Demo

在分析源码先搭建一个最简单的微服务实例,该Demo分为三个实例:

eureka-server
Eureka注册中心

service-server
服务提供者

service-client
服务消费者

pom.xml文件统一配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>service-server</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Brixton.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

</project>


Eureka注册中心

application.yml

spring:
application:
name: eureka

server:
port: 8761

management:
port: 8001

eureka:
instance:
hostname: localhost
preferIpAddress: true
client:
registerWithEureka: false   # 本身为注册中心不用向Eureka注册服务
fetchRegistry: false        # 不拉取服务实例列表

# ------------------------------------------------
# 集群模式使用
#    serviceUrl:
#      defaultZone: http://localhost:8762/eureka/ # ------------------------------------------------
server:
waitTimeInMsWhenSyncEmpty: 0

endpoints:
shutdown:
enabled: true     #启用shutdown
sensitive: false  #禁用密码验证


Application.java

package cn.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {

public static void main(String[] args) throws Exception {
SpringApplication.run(EurekaApplication.class, args);
}

}


服务提供者

application.yml

spring:
application:
name: service-server

server:
port: 8003
management:
port: 9003

eureka:
client:
serviceUrl:
# 若Eureka服务注册中心为集群则用 , 隔开
defaultZone: http://localhost:8761/eureka/ instance:
metadataMap:
instanceId: dev
server:
waitTimeInMsWhenSyncEmpty: 0


Application.java

package cn.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
public class Application {

public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}

}


为了节约服务资源,服务提供者若也可将eureka.client.fetchRegistry设成false,不拉取服务实例

服务消费者

application.yml

spring:
application:
name: service-client

server:
port: 8004
management:
port: 9004

eureka:
client:
healthcheck:
enabled: true
serviceUrl:
defaultZone: http://localhost:8761/eureka/ instance:
metadataMap:
instanceId: dev
server:
waitTimeInMsWhenSyncEmpty: 0
ribbon:
ServerListRefreshInterval:  5000


Application.java

package cn.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
public class Application {

public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}

}


服务启动

将以上三个服务启动

先启动注册中心,其它两个实例也类似的启动

cd eureka-service
mvn package
cd target
java -jar eureka-server-1.4.0.RELEASE


本Demo已上传至github:

https://github.com/yiliangz/spring-cloud-demo

Eureka的几个概念

Register
:服务注册

Renew
:服务续约,即常说的heartbeat, 默认在

Fetch Registries
:获取注册列表信息

Cancel
:服务下线

Eviction
服务剔除

服务注册

服务实例

Eureka是一个以
REST
为基础的服务注册中心,

客户端的对象方法栈调用如下:

EurekaClientAutoConfiguration#eurekaClient()

CloudEurekaClient#CloudEurekaClient()

DiscoveryClient#DiscoveryClient() -> initScheduledTasks()

InstanceInfoReplicator#start() -> run()

discoveryClient#register()

AbstractJerseyEurekaHttpClient#register()

服务注册的启动首先从EurekaClientAutoConfiguration的eurekaClient方法开始

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration


@Configuration
@ConditionalOnRefreshScope
protected static class RefreshableEurekaClientConfiguration {
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance) {
manager.getInfo(); // force initialization
//调用CloudEurekaClient的构造方法
return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
}

@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
//从配置文件初始化实例信息
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}

}

public class CloudEurekaClient extends DiscoveryClient {
// 继承自DiscoveryClient
// ...

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, ApplicationContext context) {

//调用DiscoveryClient的方法
this(applicationInfoManager, config, null, context);

}

// ...
}


DiscoveryClient是Eureka的核心类, 定义了心跳检测和获取实例的调度任务

心跳检测由
eureka.client.registryFetchIntervalSeconds
参数配置,默认为30s

com.netflix.discovery.DiscoveryClient


@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, DiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {

//定义一个任务调度线程池
scheduler = Executors.newScheduledThreadPool(3,...);

//心跳检测
heartbeatExecutor = new ThreadPoolExecutor(...);     // use direct handoff

//获取服务实例
cacheRefreshExecutor = new ThreadPoolExecutor(...);  // use direct handoff
// ...

//初始上面定义的三个任务调度器
initScheduledTasks();

// ...

}

//初始化调度任务
private void initScheduledTasks() {
//...
if (clientConfig.shouldRegisterWithEureka()) {

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

//此处getInitialInstanceInfoReplicationIntervalSeconds默认为40s
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}


scheduler的实现类为ScheduledThreadPoolExecutor, 其schedule()方法执行的调度任务只会执行一次

com.netflix.discovery.InstanceInfoReplicator


class InstanceInfoReplicator implements Runnable {

public void start(int initialDelayMs) {
//
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty();  // for initial register
//InstanceInfoReplicator实现了Runnale
//所以此处把this传递进去就是执行此对象的run()方法
//schedule方法只会执行一次
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

public void run() {
try {
discoveryClient.refreshInstanceInfo();

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//调用discoveryClient的register方法
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
}
}
}


DiscoveryClient除了注册还包括心跳和更新本地的服务实例

com.netflix.discovery.DiscoveryClient#register


boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
//此处使用了装饰器模式,实际执行register方法的是AbstractJerseyEurekaHttpClient类
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}


此处为真正发起服务注册REST的地方, 由源码可以知道REST访问使用到了sun公司的jersey

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient


public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
//使用jersey客户端进行restful访问
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}


服务注册、续约(心跳)、下线、剔除 都在AbstractJerseyEurekaHttpClient

InstanceStatus的状态枚举如下:

public enum InstanceStatus {
UP,       // 在用状态,可以接受服务请求
DOWN,     // 不在用, 使用/healthcheck访问实例状态会失败
STARTING, // 启动中,不可用
OUT_OF_SERVICE, // 服务不可用
UNKNOWN;

public static InstanceStatus toEnum(String s) {
for (InstanceStatus e : InstanceStatus.values()) {
if (e.name().equalsIgnoreCase(s)) {
return e;
}
}
return UNKNOWN;
}
}


注册的REST请求详情如下:

- 注册url: http://${eurekaIP}:8761/eureka/apps/SERVICE-SERVER

- type: POST

- 参数: instanceInfo

参数instanceInfo如下图所示(点击放大)



Eureka注册中心

上面分析的是关于客户端方面的注册动作,由上面我们可知服务实例会向Eureka注册中心发送一个rest注册请求,而Eureka注册中心接收服务实例注册的rest请求的方法在

com.netflix.eureka.resources.ApplicationResource#addInstance


private final PeerAwareInstanceRegistry registry;
private final ResponseCache responseCache;

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {

logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);

// ...
//调用PeerAwareInstanceRegistryImpl的register方法
registry.register(info, "true".equals(isReplication));

return Response.status(204).build();  // 204 to be backwards compatible

}


由于各节点之间的信息同步是用异步方式的, 所以节点之间的信息不能保证每时刻都一致

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl


public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;

if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//super(父类)为AbstractInstanceRegistry
super.register(info, leaseDuration, isReplication);

//将注册信息同步至其它eureka注册中心
replicateToPeers(Action.Register,info.getAppName(),info.getId(),info,null,isReplication);

}


注册实例的真正存储是一个Map,这个Map的key为服务的AppName, value为该AppName的实例集合Map,

实例集合的key为注册服务的实例id, value为 Lease, Lease的概念为租期, 租期到期的话则该服务实例会被过期剔除,

续期(心跳)可配置在以下参数:

-
eureka.instance.leaseExpirationDurationInSeconds


服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除(默认90s)

-
eureka.instance.leaseRenewalIntervalInSeconds


服务刷新时间配置,每隔这个时间会主动心跳一次(默认30s)

-- com.netflix.eureka.registry.AbstractInstanceRegistry

/**
* 注册中心真正存储服务实例信息的是一个ConcurrentHashMap
*/
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
//key为appName, 也就是以spring.application.name的大写字符
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
//如果该AppName的实例集合不存在
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// ...

gMap.put(registrant.getId(), lease);
}
// ...
}


下面再来查阅节点同步的逻辑, 从代码分析可见此处主要是防止重复操作,

每个节点的真正同步代码在replicateInstanceActionsToPeers

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl


private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
//...
// 如果是从其它节点同步过来的则不会在此再将操作信息同步至其它, 避免循环同步
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}

//
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 由于自身节点的操作已在上面的代码中进行 ,所以如果是自身节点, 则不再作重复操作
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//将操作同步, 包括注册,续约(心跳),服务下线等都在此方法
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}


服务发现

服务消费者更新服务实例

服务发现也即服务实例拉取, 由于服务发现的逻辑与服务注册类似,这里只作简单分析

服务实例拉取就是一个线程调度任务, 拉取时间间隔可以在

eureka.client.registryFetchIntervalSeconds
配置, 默认为30s;

com.netflix.discovery.DiscoveryClient#initScheduledTasks

if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//TimedSupervisorTask继承了TimerTask, 为一个调度任务
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//CacheRefreshThread会调用run()方法
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
//CacheRefreshThread实际执行了fetchRegistry方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

try {

if (clientConfig.shouldDisableDelta()|| …)){
//全量拉取
getAndStoreFullRegistry();
} else {
//增量拉取
getAndUpdateDelta(applications);
}

}
// ...
return true;
}

/**
* 此处仅贴出全量拉取代码, 增量拉取的代码自行参阅
*/
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();

logger.info("Getting all instance registry info from the eureka server");

Applications apps = null;
EurekaHttpResponse<Applications> httpResponse =clientConfig.getRegistryRefreshSingleVipAddress() == null

? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())

: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(),
remoteRegionsRef.get());

if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}

logger.info("The response status is {}", httpResponse.getStatusCode());

}


最后向Eureka发送一个查询服务实例的REST请求

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient


private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
}
}


Eureka注册中心返回服务注册实例

Eureka注册中心对服务拉取的处理, 可以看到返回服务实例不是从此前分析过的服务实例注册存储的类型为ConcurrentHashMap的registry,

返回服务实例信息的是一个responseCache, 有一个定期任务会将registry的信息更新至responseCache, 该定期时间可配置在
eureka.server.responseCacheUpdateInvervalMs
,

定期任务的执行在
com.netflix.eureka.registry.ResponseCacheImpl#ResponseCacheImpl


接收rest请求在
com.netflix.eureka.resources.ApplicationsResource#getContainers


private final PeerAwareInstanceRegistry registry;
//实际返回服务注册实例数据的缓存
private final ResponseCache responseCache;

@Inject
ApplicationsResource(EurekaServerContext eurekaServer) {
this.serverConfig = eurekaServer.getServerConfig();
this.registry = eurekaServer.getRegistry();
//服务缓存从是从registry里取
this.responseCache = registry.getResponseCache();
}

@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@Nullable @QueryParam("regions") String regionsStr) {

Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
// ...
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//从responseCache取返回数据
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey)).build();
}
return response;
}


总结

本文浅度剖析了Eureka服务注册发现的源码,可以看出服务注册和发现的核心是用任务线程池实现,任务调度器使用REST与Eureka注册中心进行通信。值得注意的是Spring Cloud Netflix组件为了性能的考虑大量使用了缓存,在实际生产上使用的话服务下线会使得消费不能立即感应导致调用了已下线的服务实例,解决此问题需清楚组件的各个任务调度和时间间隔,根据项目需要调整时间间隔。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐