熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker
2018-01-06 00:00
691 查看
摘要: 原创出处 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
1. 概述
2. HystrixCircuitBreaker
3. HystrixCircuitBreaker.Factory
4. HystrixCircuitBreakerImpl
4.1 构造方法
4.2 #subscribeToStream()
4.3 #attemptExecution()
4.4 #markSuccess()
4.5 #markNonSuccess()
4.6 #allowRequest()
4.7 #isOpen()
666. 彩蛋
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
新的源码解析文章实时收到通知。每周更新一篇左右。
认真的源码交流微信群。
HystrixCircuitBreaker 有三种状态 :
其中,断路器处于
HystrixCircuitBreaker 状态变迁如下图 :
红线 :初始时,断路器处于
周期( 可配,
错误请求占总请求数超过一定比例( 可配,
绿线 :断路器处于
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
HystrixCircuitBreaker 有两个子类实现 :
NoOpCircuitBreaker :空的断路器实现,用于不开启断路器功能的情况。
HystrixCircuitBreakerImpl :完整的断路器实现。
在 AbstractCommand 创建时,初始化 HystrixCircuitBreaker ,代码如下 :
当
当
创建 HystrixCircuitBreaker 对象,目前只创建 HystrixCircuitBreakerImpl 。
HystrixCircuitBreaker 容器,基于 HystrixCommandKey 维护了 HystrixCircuitBreaker 单例对象 的映射。代码如下 :
整体代码灰常清晰,点击 链接 查看代码。
我们来逐个方法看看 HystrixCircuitBreakerImpl 的具体实现。
Status 枚举类,断路器的三种状态。
第 5 至 7 行 :向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。这里的 Observable 基于 RxJava Window 操作符。
FROM 《ReactiveX文档中文翻译》「Window」
定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据
简单来说,固定间隔,
第 22 行 :判断周期( 可配,
这里要注意下,请求次数统计的是周期内,超过周期的不计算在内。例如说,
第 29 行 :错误请求占总请求数超过一定比例( 可配,
第 37 至 39 行 :满足断路器打开条件,CAS 修改状态(
【补充】第 5 至 7 行 :😈 怕写在上面,大家有压力。Hystrix Metrics 对请求量统计 Observable 使用了两种 RxJava Window 操作符 :
目前该方法有两处调用 :
「4.1 构造方法」,在创建 HystrixCircuitBreakerImpl 时,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。固定间隔,计算断路器是否要关闭(
「4.4 #markSuccess()」,清空 Hystrix Metrics 对请求量统计 Observable 的统计信息,取消原有订阅,并发起新的订阅。
使用
第 4 至 6 行 :当
第 8 至 10 行 :当
第 12 至 13 行 :断路器打开时间(
第 16 至 28 行 :调用
在当前时间超过断路器打开时间
第 3 行 :使用 CAS 方式,修改断路器状态(
第 6 行 :清空 Hystrix Metrics 对请求量统计 Observable 的统计信息。
第 8 至 14 行 :取消原有订阅,发起新的订阅。
第 16 行 :设置断路器打开时间为"空" 。
如下两处调用了
第 3 行 :使用 CAS 方式,修改断路器状态(
第 5 行 :设置设置断路器打开时间为当前时间。这样,
如下两处调用了
胖友,分享一波朋友圈可好!
本文主要基于 Hystrix 1.5.X 版本
1. 概述
2. HystrixCircuitBreaker
3. HystrixCircuitBreaker.Factory
4. HystrixCircuitBreakerImpl
4.1 构造方法
4.2 #subscribeToStream()
4.3 #attemptExecution()
4.4 #markSuccess()
4.5 #markNonSuccess()
4.6 #allowRequest()
4.7 #isOpen()
666. 彩蛋
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
新的源码解析文章实时收到通知。每周更新一篇左右。
认真的源码交流微信群。
1. 概述
本文主要分享 断路器 HystrixCircuitBreaker。HystrixCircuitBreaker 有三种状态 :
CLOSED:关闭
OPEN:打开
HALF_OPEN:半开
其中,断路器处于
OPEN状态时,链路处于非健康状态,命令执行时,直接调用回退逻辑,跳过正常逻辑。
HystrixCircuitBreaker 状态变迁如下图 :
红线 :初始时,断路器处于
CLOSED状态,链路处于健康状态。当满足如下条件,断路器从
CLOSED变成
OPEN状态:
周期( 可配,
HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms)内,总请求数超过一定量( 可配,
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20) 。
错误请求占总请求数超过一定比例( 可配,
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%) 。
绿线 :断路器处于
OPEN状态,命令执行时,若当前时间超过断路器开启时间一定时间(
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms),断路器变成
HALF_OPEN状态,尝试调用正常逻辑,根据执行是否成功,打开或关闭熔断器【蓝线】。
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
2. HystrixCircuitBreaker
com.netflix.hystrix.HystrixCircuitBreaker,Hystrix 断路器接口。定义接口如下代码 :
public interface HystrixCircuitBreaker { /** * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does * not modify any internal state, and takes into account the half-open logic which allows some requests through * after the circuit has been opened * * @return boolean whether a request should be permitted */ boolean allowRequest(); /** * Whether the circuit is currently open (tripped). * * @return boolean state of circuit breaker */ boolean isOpen(); /** * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */ void markSuccess(); /** * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */ void markNonSuccess(); /** * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal * state. */ boolean attemptExecution(); }
#allowRequest()和
#attemptExecution()方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态(
CLOSE => HALF-OPEN),而后者会。
HystrixCircuitBreaker 有两个子类实现 :
NoOpCircuitBreaker :空的断路器实现,用于不开启断路器功能的情况。
HystrixCircuitBreakerImpl :完整的断路器实现。
在 AbstractCommand 创建时,初始化 HystrixCircuitBreaker ,代码如下 :
/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { /** * 断路器 */ protected final HystrixCircuitBreaker circuitBreaker; protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { // ... 省略无关代码 // 初始化 断路器 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); // ... 省略无关代码 } private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { if (enabled) { if (fromConstructor == null) { // get the default implementation of HystrixCircuitBreaker return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics); } else { return fromConstructor; } } else { return new NoOpCircuitBreaker(); } } }
当
HystrixCommandProperties.circuitBreakerEnabled = true时,即断路器功能开启,使用 Factory 获得 HystrixCircuitBreakerImpl 对象。在 「3. HystrixCircuitBreaker.Factory」 详细解析。
当
HystrixCommandProperties.circuitBreakerEnabled = false时,即断路器功能关闭,创建 NoOpCircuitBreaker 对象。另外,NoOpCircuitBreaker 代码简单到脑残,点击 链接 查看实现。
3. HystrixCircuitBreaker.Factory
com.netflix.hystrix.HystrixCircuitBreaker.Factory,HystrixCircuitBreaker 工厂,主要用于:
创建 HystrixCircuitBreaker 对象,目前只创建 HystrixCircuitBreakerImpl 。
HystrixCircuitBreaker 容器,基于 HystrixCommandKey 维护了 HystrixCircuitBreaker 单例对象 的映射。代码如下 :
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
整体代码灰常清晰,点击 链接 查看代码。
4. HystrixCircuitBreakerImpl
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl,完整的断路器实现。
我们来逐个方法看看 HystrixCircuitBreakerImpl 的具体实现。
4.1 构造方法
构造方法,代码如下 :/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; enum Status { CLOSED, OPEN, HALF_OPEN } private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); private final AtomicLong circuitOpened = new AtomicLong(-1); private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } }
Status 枚举类,断路器的三种状态。
status属性,断路器的状态。
circuitOpened属性,断路器打开,即状态变成
OPEN的时间。
activeSubscription属性,基于 Hystrix Metrics 对请求量统计 Observable 的订阅,在 「4.2 #subscribeToStream()」 详细解析。
4.2 #subscribeToStream()
#subscribeToStream()方法,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。代码如下 :
private Subscription subscribeToStream() { 1: private Subscription subscribeToStream() { 2: /* 3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream 4: */ 5: return metrics.getHealthCountsStream() 6: .observe() 7: .subscribe(new Subscriber<HealthCounts>() { 8: @Override 9: public void onCompleted() { 10: 11: } 12: 13: @Override 14: public void onError(Throwable e) { 15: 16: } 17: 18: @Override 19: public void onNext(HealthCounts hc) { 20: System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于调试 21: // check if we are past the statisticalWindowVolumeThreshold 22: if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { 23: // we are not past the minimum volume threshold for the stat window, 24: // so no change to circuit status. 25: // if it was CLOSED, it stays CLOSED 26: // if it was half-open, we need to wait for a successful command execution 27: // if it was open, we need to wait for sleep window to elapse 28: } else { 29: if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { 30: //we are not past the minimum error threshold for the stat window, 31: // so no change to circuit status. 32: // if it was CLOSED, it stays CLOSED 33: // if it was half-open, we need to wait for a successful command execution 34: // if it was open, we need to wait for sleep window to elapse 35: } else { 36: // our failure rate is too high, we need to set the state to OPEN 37: if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { 38: circuitOpened.set(System.currentTimeMillis()); 39: } 40: } 41: } 42: } 43: }); 44: }
第 5 至 7 行 :向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。这里的 Observable 基于 RxJava Window 操作符。
FROM 《ReactiveX文档中文翻译》「Window」
定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据
简单来说,固定间隔,
#onNext()方法将不断被调用,每次计算断路器的状态。
第 22 行 :判断周期( 可配,
HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms)内,总请求数超过一定量( 可配,
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20) 。
这里要注意下,请求次数统计的是周期内,超过周期的不计算在内。例如说,
00:00内发起了 N 个请求,
00:11不计算这 N 个请求。
第 29 行 :错误请求占总请求数超过一定比例( 可配,
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%) 。
第 37 至 39 行 :满足断路器打开条件,CAS 修改状态(
CLOSED => OPEN),并设置打开时间(
circuitOpened) 。
【补充】第 5 至 7 行 :😈 怕写在上面,大家有压力。Hystrix Metrics 对请求量统计 Observable 使用了两种 RxJava Window 操作符 :
Observable#window(timespan, unit)方法,固定周期( 可配,
HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms),发射 Observable 窗口。点击 BucketedCounterStream 构造方法 查看调用处的代码。
Observable#window(count, skip)方法,每发射一次(
skip) Observable 忽略
count( 可配,
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20) 个数据项。为什么?答案在第 22 行的代码,周期内达到一定请求量是断路器打开的一个条件。点击 BucketedRollingCounterStream 构造方法 查看调用处的代码。
目前该方法有两处调用 :
「4.1 构造方法」,在创建 HystrixCircuitBreakerImpl 时,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。固定间隔,计算断路器是否要关闭(
CLOSE)。
「4.4 #markSuccess()」,清空 Hystrix Metrics 对请求量统计 Observable 的统计信息,取消原有订阅,并发起新的订阅。
4.3 #attemptExecution()
如下是AbstractCommand#applyHystrixSemantics(_cmd)方法,对
HystrixCircuitBreakerImpl#attemptExecution方法的调用的代码 :
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // ... 省略无关代码 /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { // 执行【正常逻辑】 } else { // 执行【回退逻辑】 } }
使用
HystrixCircuitBreakerImpl#attemptExecution方法,判断是否可以执行正常逻辑。
#attemptExecution方法,代码如下 :
1: @Override 2: public boolean attemptExecution() { 3: // 强制 打开 4: if (properties.circuitBreakerForceOpen().get()) { 5: return false; 6: } 7: // 强制 关闭 8: if (properties.circuitBreakerForceClosed().get()) { 9: return true; 10: } 11: // 打开时间为空 12: if (circuitOpened.get() == -1) { 13: return true; 14: } else { 15: // 满足间隔尝试断路器时间 16: if (isAfterSleepWindow()) { 17: //only the first request after sleep window should execute 18: //if the executing command succeeds, the status will transition to CLOSED 19: //if the executing command fails, the status will transition to OPEN 20: //if the executing command gets unsubscribed, the status will transition to OPEN 21: if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { 22: return true; 23: } else { 24: return false; 25: } 26: } else { 27: return false; 28: } 29: } 30: }
第 4 至 6 行 :当
HystrixCommandProperties.circuitBreakerForceOpen = true( 默认值 :
false) 时,即断路器强制打开,返回
false。当该配置接入配置中心后,可以动态实现打开熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。
第 8 至 10 行 :当
HystrixCommandProperties.circuitBreakerForceClose = true( 默认值 :
false) 时,即断路器强制关闭,返回
true。当该配置接入配置中心后,可以动态实现关闭熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。
第 12 至 13 行 :断路器打开时间(
circuitOpened) 为"空",返回
true。
第 16 至 28 行 :调用
#isAfterSleepWindow()方法,判断是否满足尝试调用正常逻辑的间隔时间。当满足,使用 CAS 方式修改断路器状态(
OPEN => HALF_OPEN),从而保证有且仅有一个线程能够尝试调用正常逻辑。
#isAfterSleepWindow()方法,代码如下 :
private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; }
在当前时间超过断路器打开时间
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds( 默认值,
5000 ms),返回
true。
4.4 #markSuccess()
当尝试调用正常逻辑成功时,调用#markSuccess()方法,关闭断路器。代码如下 :
1: @Override 2: public void markSuccess() { 3: if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { 4: // 清空 Hystrix Metrics 对请求量统计 Observable 的**统计信息** 5: //This thread wins the race to close the circuit - it resets the stream to start it over from 0 6: metrics.resetStream(); 7: // 取消原有订阅 8: Subscription previousSubscription = activeSubscription.get(); 9: if (previousSubscription != null) { 10: previousSubscription.unsubscribe(); 11: } 12: // 发起新的订阅 13: Subscription newSubscription = subscribeToStream(); 14: activeSubscription.set(newSubscription); 15: // 设置断路器打开时间为空 16: circuitOpened.set(-1L); 17: } 18: }
第 3 行 :使用 CAS 方式,修改断路器状态(
HALF_OPEN => CLOSED)。
第 6 行 :清空 Hystrix Metrics 对请求量统计 Observable 的统计信息。
第 8 至 14 行 :取消原有订阅,发起新的订阅。
第 16 行 :设置断路器打开时间为"空" 。
如下两处调用了
#markNonSuccess()方法 :
markEmits
markOnCompleted
4.5 #markNonSuccess()
当尝试调用正常逻辑失败时,调用#markNonSuccess()方法,重新打开断路器。代码如下 :
1: @Override 2: public void markNonSuccess() { 3: if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { 4: //This thread wins the race to re-open the circuit - it resets the start time for the sleep window 5: circuitOpened.set(System.currentTimeMillis()); 6: } 7: }
第 3 行 :使用 CAS 方式,修改断路器状态(
HALF_OPEN => OPEN)。
第 5 行 :设置设置断路器打开时间为当前时间。这样,
#attemptExecution()过一段时间,可以再次尝试执行正常逻辑。
如下两处调用了
#markNonSuccess()方法 :
handleFallback
unsubscribeCommandCleanup
4.6 #allowRequest()
#allowRequest()和
#attemptExecution()方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态(
CLOSE => HALF-OPEN),而后者会。点击 链接 查看代码实现。
4.7 #isOpen()
#isOpen()方法,比较简单,点击 链接 查看代码实现。
666. 彩蛋
呼呼,相对比较干净的一篇文章,满足。胖友,分享一波朋友圈可好!
相关文章推荐
- Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker
- Hystrix熔断器技术解析-HystrixCircuitBreaker
- 熔断器 Hystrix 源码解析 —— 执行结果缓存
- 熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑
- 熔断器 Hystrix 源码解析 —— 命令合并执行
- 熔断器 Hystrix 源码解析 —— 调试环境搭建
- 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
- 熔断器 Hystrix 源码解析 —— 调试环境搭建
- 熔断器 Hystrix 源码解析 —— 执行命令方式
- 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- 熔断器 Hystrix 源码解析 —— 执行命令方式
- 熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑
- 熔断器 Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑
- 熔断器 Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑
- 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
- 熔断器 Hystrix 源码解析 —— 执行结果缓存
- 服务容错保护断路器Hystrix之二:Hystrix工作流程解析
- spring cloud: Hystrix断路器(熔断器)
- spring-cloud源码解析-hystrix的基本介绍和配置属性说明