您的位置:首页 > 产品设计 > UI/UE

Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker

2017-12-12 11:36 323 查看
摘要: 原创出处 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

1. 概述

2. HystrixCircuitBreaker

3. HystrixCircuitBreaker.Factory

4. HystrixCircuitBreakerImpl
4.1 构造方法

4.2 #subscribeToStream()

4.3 #attemptExecution()

4.4 #markSuccess()

4.5 #markNonSuccess()

4.6 #allowRequest()

4.7 #isOpen()

1. 概述

本文主要分享 断路器 HystrixCircuitBreaker

HystrixCircuitBreaker 有三种状态 :

CLOSED
:关闭

OPEN
:打开

HALF_OPEN
:半开

其中,断路器处于
OPEN
状态时,链路处于非健康状态,命令执行时,直接调用回退逻辑,跳过正常逻辑。

HystrixCircuitBreaker 状态变迁如下图 :



红线 :初始时,断路器处于
CLOSED
状态,链路处于健康状态。当满足如下条件,断路器从
CLOSED
变成
OPEN
状态:

周期( 可配,
HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms
)内,总请求数超过一定( 可配,
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
) 。

错误请求占总请求数超过一定比例( 可配,
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%
) 。

绿线 :断路器处于
OPEN
状态,命令执行时,若当前时间超过断路器开启时间一定时间(
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms
),断路器变成
HALF_OPEN
状态,尝试调用正常逻辑,根据执行是否成功,打开或关闭熔断器【蓝线】。

推荐 Spring Cloud 书籍

请支持正版。下载盗版,等于主动编写低级 BUG

程序猿DD —— 《Spring Cloud微服务实战》

周立 —— 《Spring Cloud与Docker微服务架构实战》

两书齐买,京东包邮。

2. HystrixCircuitBreaker

com.netflix.hystrix.HystrixCircuitBreaker
,Hystrix 断路器接口。定义接口如下代码 :

public interface HystrixCircuitBreaker {    /**     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does     * not modify any internal state, and takes into account the half-open logic which allows some requests through     * after the circuit has been opened     *      * @return boolean whether a request should be permitted     */    boolean allowRequest();    /**     * Whether the circuit is currently open (tripped).     *      * @return boolean state of circuit breaker     */    boolean isOpen();    /**     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.     */    void markSuccess();    /**     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.     */    void markNonSuccess();    /**     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal     * state.     */    boolean attemptExecution();}
#allowRequest()
#attemptExecution()
方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态(
CLOSE => HALF-OPEN
),而后者会。

HystrixCircuitBreaker 有两个子类实现 :

NoOpCircuitBreaker :的断路器实现,用于不开启断路器功能的情况。

HystrixCircuitBreakerImpl :完整的断路器实现。

在 AbstractCommand 创建时,初始化 HystrixCircuitBreaker ,代码如下 :

/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {        /**     * 断路器     */    protected final HystrixCircuitBreaker circuitBreaker;    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {        // ... 省略无关代码                // 初始化 断路器        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);        // ... 省略无关代码    }    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {        if (enabled) {            if (fromConstructor == null) {                // get the default implementation of HystrixCircuitBreaker                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);            } else {                return fromConstructor;            }        } else {            return new NoOpCircuitBreaker();        }    }}
HystrixCommandProperties.circuitBreakerEnabled = true
时,即断路器功能开启,使用 Factory 获得 HystrixCircuitBreakerImpl 对象。在 「3. HystrixCircuitBreaker.Factory」 详细解析。

HystrixCommandProperties.circuitBreakerEnabled = false
时,即断路器功能关闭,创建 NoOpCircuitBreaker 对象。另外,NoOpCircuitBreaker 代码简单到脑残,点击 链接 查看实现。

3. HystrixCircuitBreaker.Factory

com.netflix.hystrix.HystrixCircuitBreaker.Factory
,HystrixCircuitBreaker 工厂,主要用于:

创建 HystrixCircuitBreaker 对象,目前只创建 HystrixCircuitBreakerImpl 。

HystrixCircuitBreaker 容器,基于 HystrixCommandKey 维护了 HystrixCircuitBreaker 单例对象 的映射。代码如下 :

private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
整体代码灰常清晰,点击 链接 查看代码。

4. HystrixCircuitBreakerImpl

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl
完整的断路器实现。

我们来逐个方法看看 HystrixCircuitBreakerImpl 的具体实现。

4.1 构造方法

构造方法,代码如下 :

/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {    private final HystrixCommandProperties properties;    private final HystrixCommandMetrics metrics;    enum Status {        CLOSED, OPEN, HALF_OPEN    }    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);    private final AtomicLong circuitOpened = new AtomicLong(-1);    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {        this.properties = properties;        this.metrics = metrics;        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur        Subscription s = subscribeToStream();        activeSubscription.set(s);    }}
Status 枚举类,断路器的三种状态。

status
属性,断路器的状态。

circuitOpened
属性,断路器打开,即状态变成
OPEN
的时间。

activeSubscription
属性,基于 Hystrix Metrics 对请求量统计 Observable 的订阅,在 「4.2 #subscribeToStream()」 详细解析。

4.2 #subscribeToStream()

#subscribeToStream()
方法,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。代码如下 :

private Subscription subscribeToStream() {  1: private Subscription subscribeToStream() {  2:     /*  3:      * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream  4:      */  5:     return metrics.getHealthCountsStream()  6:             .observe()  7:             .subscribe(new Subscriber<HealthCounts>() {  8:                 @Override  9:                 public void onCompleted() { 10:  11:                 } 12:  13:                 @Override 14:                 public void onError(Throwable e) { 15:  16:                 } 17:  18:                 @Override 19:                 public void onNext(HealthCounts hc) { 20:                     System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于调试 21:                     // check if we are past the statisticalWindowVolumeThreshold 22:                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { 23:                         // we are not past the minimum volume threshold for the stat window, 24:                         // so no change to circuit status. 25:                         // if it was CLOSED, it stays CLOSED 26:                         // if it was half-open, we need to wait for a successful command execution 27:                         // if it was open, we need to wait for sleep window to elapse 28:                     } else { 29:                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { 30:                             //we are not past the minimum error threshold for the stat window, 31:                             // so no change to circuit status. 32:                             // if it was CLOSED, it stays CLOSED 33:                             // if it was half-open, we need to wait for a successful command execution 34:                             // if it was open, we need to wait for sleep window to elapse 35:                         } else { 36:                             // our failure rate is too high, we need to set the state to OPEN 37:                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { 38:                                 circuitOpened.set(System.currentTimeMillis()); 39:                             } 40:                         } 41:                     } 42:                 } 43:             }); 44: }
第 5 至 7 行 :向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。这里的 Observable 基于 RxJava Window 操作符。

FROM 《ReactiveX文档中文翻译》「Window」
定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据

简单来说,固定间隔,
#onNext()
方法将不断被调用,每次计算断路器的状态。

第 22 行 :判断周期( 可配,
HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms
)内,总请求数超过一定( 可配,
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
) 。

这里要注意下,请求次数统计的是周期内,超过周期的不计算在内。例如说,
00:00
内发起了 N 个请求,
00:11
不计算这 N 个请求。

第 29 行 :错误请求占总请求数超过一定比例( 可配,
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%
) 。

第 37 至 39 行 :满足断路器打开条件,CAS 修改状态(
CLOSED => OPEN
),并设置打开时间(
circuitOpened
) 。

补充】第 5 至 7 行 :�� 怕写在上面,大家有压力。Hystrix Metrics 对请求量统计 Observable 使用了两种 RxJava Window 操作符 :

Observable#window(timespan, unit)
方法,固定周期( 可配,
HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms
),发射 Observable 窗口。点击 BucketedCounterStream 构造方法 查看调用处的代码。

Observable#window(count, skip)
方法,每发射一次(
skip
) Observable 忽略
count
( 可配,
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
) 个数据项。为什么?答案在第 22 行的代码,周期内达到一定请求量是断路器打开的一个条件。点击 BucketedRollingCounterStream 构造方法 查看调用处的代码。

目前该方法有两处调用 :

「4.1 构造方法」,在创建 HystrixCircuitBreakerImpl 时,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。固定间隔,计算断路器是否要关闭(
CLOSE
)。

「4.4 #markSuccess()」,清空 Hystrix Metrics 对请求量统计 Observable 的统计信息,取消原有订阅,并发起新的订阅。

4.3 #attemptExecution()

如下是
AbstractCommand#applyHystrixSemantics(_cmd)
方法,对
HystrixCircuitBreakerImpl#attemptExecution
方法的调用的代码 :

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {    // ...  省略无关代码   /* determine if we're allowed to execute */   if (circuitBreaker.attemptExecution()) {        // 执行【正常逻辑】   } else {        // 执行【回退逻辑】   }}
使用
HystrixCircuitBreakerImpl#attemptExecution
方法,判断是否可以执行正常逻辑

#attemptExecution
方法,代码如下 :

1: @Override 2: public boolean attemptExecution() { 3:     // 强制 打开 4:     if (properties.circuitBreakerForceOpen().get()) { 5:         return false; 6:     } 7:     // 强制 关闭 8:     if (properties.circuitBreakerForceClosed().get()) { 9:         return true;10:     }11:     // 打开时间为空12:     if (circuitOpened.get() == -1) {13:         return true;14:     } else {15:         // 满足间隔尝试断路器时间16:         if (isAfterSleepWindow()) {17:             //only the first request after sleep window should execute18:             //if the executing command succeeds, the status will transition to CLOSED19:             //if the executing command fails, the status will transition to OPEN20:             //if the executing command gets unsubscribed, the status will transition to OPEN21:             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {22:                 return true;23:             } else {24:                 return false;25:             }26:         } else {27:             return false;28:         }29:     }30: }
第 4 至 6 行 :当
HystrixCommandProperties.circuitBreakerForceOpen = true
( 默认值 :
false
) 时,即断路器强制打开,返回
false
。当该配置接入配置中心后,可以动态实现打开熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。

第 8 至 10 行 :当
HystrixCommandProperties.circuitBreakerForceClose = true
( 默认值 :
false
) 时,即断路器强制关闭,返回
true
。当该配置接入配置中心后,可以动态实现关闭熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。

第 12 至 13 行 :断路器打开时间(
circuitOpened
) 为”空”,返回
true


第 16 至 28 行 :调用
#isAfterSleepWindow()
方法,判断是否满足尝试调用正常逻辑的间隔时间。当满足,使用 CAS 方式修改断路器状态(
OPEN => HALF_OPEN
),从而保证有且仅有一个线程能够尝试调用正常逻辑。

#isAfterSleepWindow()
方法,代码如下 :

private boolean isAfterSleepWindow() {    final long circuitOpenTime = circuitOpened.get();    final long currentTime = System.currentTimeMillis();    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();    return currentTime > circuitOpenTime + sleepWindowTime;}
当前时间超过断路器打开时间
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds
( 默认值,
5000 ms
),返回
true


4.4 #markSuccess()

当尝试调用正常逻辑成功时,调用
#markSuccess()
方法,关闭断路器。代码如下 :

1: @Override 2: public void markSuccess() { 3:     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { 4:         // 清空 Hystrix Metrics 对请求量统计 Observable 的**统计信息** 5:         //This thread wins the race to close the circuit - it resets the stream to start it over from 0 6:         metrics.resetStream(); 7:         // 取消原有订阅 8:         Subscription previousSubscription = activeSubscription.get(); 9:         if (previousSubscription != null) {10:             previousSubscription.unsubscribe();11:         }12:         // 发起新的订阅13:         Subscription newSubscription = subscribeToStream();14:         activeSubscription.set(newSubscription);15:         // 设置断路器打开时间为空16:         circuitOpened.set(-1L);17:     }18: }
第 3 行 :使用 CAS 方式,修改断路器状态(
HALF_OPEN => CLOSED
)。

第 6 行 :清空 Hystrix Metrics 对请求量统计 Observable 的统计信息

第 8 至 14 行 :取消原有订阅,发起新的订阅。

第 16 行 :设置断路器打开时间为”空” 。

如下两处调用了
#markNonSuccess()
方法 :

markEmits


markOnCompleted


4.5 #markNonSuccess()

当尝试调用正常逻辑失败时,调用
#markNonSuccess()
方法,重新打开断路器。代码如下 :

1: @Override2: public void markNonSuccess() {3:     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {4:         //This thread wins the race to re-open the circuit - it resets the start time for the sleep window5:         circuitOpened.set(System.currentTimeMillis());6:     }7: }
第 3 行 :使用 CAS 方式,修改断路器状态(
HALF_OPEN => OPEN
)。

第 5 行 :设置设置断路器打开时间为当前时间。这样,
#attemptExecution()
过一段时间,可以再次尝试执行正常逻辑。

如下两处调用了
#markNonSuccess()
方法 :

handleFallback


unsubscribeCommandCleanup


4.6 #allowRequest()

#allowRequest()
#attemptExecution()
方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态(
CLOSE => HALF-OPEN
),而后者会。点击 链接 查看代码实现。

4.7 #isOpen()

#isOpen()
方法,比较简单,点击 链接 查看代码实现。



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: