您的位置:首页 > 编程语言 > Java开发

客户端负载均衡 Spring Cloud Ribbon

2017-09-08 15:12 1191 查看

1.基本思路



       客户端从服务中心获取服务提供者信息,当客户端使用服务名调用服务时,拦截器帮你把服务名替换为具体的服务地址,服务地址的选择是由路由规则先定。默认是线性轮询。

2.参数配置

       主配置类:RibbonClientConfiguration,其中 IClientConfig 的默认实现类是DefaultClientConfigImpl, IRule的默认实现类是ZoneAvoidanceRule,实现按区域过滤服务,然后轮询获得一个服务

@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}


IPing的默认实现类是DummyPing(无任何操作,直接返回true),ServerList<Server>的默认实现类是ConfigurationBasedServerList等等。其中还包含一个PropertiesFactory

public PropertiesFactory() {
classToProperty.put(ILoadBalancer.class, "NFLoadBalancerClassName");
classToProperty.put(IPing.class, "NFLoadBalancerPingClassName");
classToProperty.put(IRule.class, "NFLoadBalancerRuleClassName");
classToProperty.put(ServerList.class, "NIWSServerListClassName");
classToProperty.put(ServerListFilter.class, "NIWSServerListFilterClassName");
}

定义了通过配置文件进行配置<clientName>.ribbon.<key>=<value> 如

my-servername.ribbon.NFLoadBalancerPingClassName=com.netflix.loadbalancer.PingUrl

3.与Eureka结合

   当spring cloud 中引人ribbon 和Eureka依赖时,会触发Eureka对Ribbon的自动化配置,
配置类是EurekaRibbonClientConfiguration
ServerList的维护机制实现将被DomainExtractingServerList覆盖,IPing的实现将被NIWSDiscoveryPing覆盖。

@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}


4.重试机制

    Spring Cloud Eureka实现的服务治理机制强调CAP原理中的AP,即可用性和可靠性(分区容错性),与zookeeper这类强调CP(一致性、可靠性)的服务治理框架最大的区别就是,Eureka为了实现更高的服务可用性,牺牲了一定的一致性,在极端情况下它宁愿接受故障的实例也不要丢掉健康的实例,比如当服务中心的网络发生故障断开时,由于服务实例无法维持续约心跳,在强调AP的服务治理中,将会把所有服务实例都剔除掉,而Eureka则会因触发保护机制,注册中心将会保留此时的所有节点,即使其中有部分故障节点。所以引人重试机制是非常重要的。重试机制是使用的Spring
retry来实现的。

   spring.cloud.loadbalancer.retry.enabled=true

   hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000 如果配置了断路器,则断路器的超时时间一定要大于Ribbon的超时时间,否则不会触发重试

   my-servername.ribbon.ConnectTimeout=200

   my-servername.ribbon.ReadTimeout=1000

   my-servername.ribbon.OkToRetryOnAllOperations=true 读所有的请求都进行重试

   my-servername.ribbon.MaxAutoRetriesNextServer=2  切换实例的重试次数

   my-servername.ribbon.MaxAutoRetries=1 对当前实例的重试次数

   当访问到故障的服务时,它会再尝试访问一次当前实例(由MaxAutoRetries配置),如果不行,就换一个实例进行访问,如果还是不行,再换一个实例访问(由MaxAutoRetriesNextServer配置),如果依然不行,则返回失败信息。

5.负载均衡策略

   


  IRule接口:根据key从负责均衡器中

public interface IRule{
/*
* choose one alive server from lb.allServers or
* lb.upServers according to key
*
* @return choosen Server object. NULL is returned if none
*  server is available
*/

public Server choose(Object key);

public void setLoadBalancer(ILoadBalancer lb);

public ILoadBalancer getLoadBalancer();
}


 RandomRule:随机获取一个可用服务 核心代码如下

List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}

int index = rand.nextInt(serverCount);
server = upList.get(index);

if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

感觉这里写错了。应该是allList.get(index)
int index = rand.nextInt(serverCount);
server = upList.get(index);


RoundRobinRule:轮询所有服务,直到找到可用服务(最大重试10次)

while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}

int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}

// Next.
server = null;
}


RetryRule:使用RoundRobinRule获取服务,获取失败则重试,默认超时时间500ms

IRule subRule = new RoundRobinRule();

long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;

Server answer = null;

answer = subRule.choose(key);

if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {

InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());

while (!Thread.interrupted()) {
answer = subRule.choose(key);

if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}

task.cancel();


InterruptTask 会在超时后进行线程中断,这样就跳出while循环

public void run() {
if ((target != null) && (target.isAlive())) {
target.interrupt();
}
}


      WeightedResponseTimeRule:基于响应时间的权重获取服务,刚开始没有权重值则使用轮询法(继承RoundRobinRule)。

    设置权重计算任务默认30秒重新计算一次

void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}

   权重区间法,随机一个区间内的值,找到大于等于随机值的点。比如A的权重为10,B的权重为30,C的权重为40,D的权重为20,则A的区间是0到10,B的区间是11到40,C的区间是41到80,D的区间值是81到100。随机0到100区间的值,比如是50,落入C区间,则选择C服务。

int serverIndex = 0;

// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d) {
server =  super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}

server = allList.get(serverIndex);
}


     PredicateBasedRule:根据过滤条件过滤服务后,再轮询过滤后的服务。

@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}


    ZoneAvoidanceRule:组合了两个过滤条件,是PredicateBasedRule的实现。

private CompositePredicate compositePredicate;

public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: