您的位置:首页 > 其它

Alibaba Sentinel LeapArray 源码阅读

2020-08-13 04:28 1271 查看

「深度学习福利」大神带你进阶工程师,立即查看>>>

最近在使用Alibaba Sentinel来做服务的限流、熔断和降级。一直有一个比较好奇的点,Sentinel是如果做到高效的数据统计的。通过官方文档介绍

  • StatisticSlot: 则用于记录、统计不同纬度的 runtime 指标监控信息;(做实时统计)
  • Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

由此可以发现Sentinel使用了滑动窗口算法来做数据统计,并且具体实现是在LeapArray类中。

Sentinel 总体的框架如下:

通过架构图我们可以看到StatisticSlot中的LeapArray采用了一个环性数组的数据结构,这个和一致性hash算法的图类似,如图:

在这个结构中,每一个下标位就代表一个滑动窗口,至于这个窗口是怎么滑动的我们可以结合原来看。

LeapArray 源码

源码路径

StatisticSlot作为统计的入口,在其entry()方法中我们可以看到StatisticSlot会使用StatisticNode,然后StatisticNode回去引用ArrayMetric,最终使用LeapArray

根据当前时间获取滑动窗口

public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
int idx = calculateTimeIdx(timeMillis);
// 根据当前时间计算出当前滑动窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
/*
* 根据下脚标在环形数组中获取滑动窗口(桶)
*
* (1) 如果桶不存在则创建新的桶,并通过CAS将新桶赋值到数组下标位。
* (2) 如果获取到的桶不为空,并且桶的开始时间等于刚刚算出来的时间,那么返回当前获取到的桶。
* (3) 如果获取到的桶不为空,并且桶的开始时间小于刚刚算出来的开始时间,那么说明这个桶是上一圈用过的桶,重置当前桶
* (4) 如果获取到的桶不为空,并且桶的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
*     B0       B1      B2    NULL      B4
* ||_______|_______|_______|_______|_______||___
* 200     400     600     800     1000    1200  timestamp
*                             ^
*                          time=888
*            bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
*     B0       B1      B2     B3      B4
* ||_______|_______|_______|_______|_______||___
* 200     400     600     800     1000    1200  timestamp
*                             ^
*                          time=888
*            startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
*   (old)
*             B0       B1      B2    NULL      B4
* |_______||_______|_______|_______|_______|_______||___
* ...    1200     1400    1600    1800    2000    2200  timestamp
*                              ^
*                           time=1676
*          startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

根据下脚标在环形数组中获取滑动窗口(桶)的规则:

  • (1) 如果桶不存在则创建新的桶,并通过CAS将新桶赋值到数组下标位。
  • (2) 如果获取到的桶不为空,并且桶的开始时间等于刚刚算出来的时间,那么返回当前获取到的桶。
  • (3) 如果获取到的桶不为空,并且桶的开始时间小于刚刚算出来的开始时间,那么说明这个桶是上一圈用过的桶,重置当前桶,并返回。
  • (4) 如果获取到的桶不为空,并且桶的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。

这里有一个比较值得学习的地方是:

  1. 对并发的控制:当一个新桶的创建直接是使用的CAS的原子操作来保证并发;但是重置一个桶的时候因为很难保证其原子操作(1. 需要重置多个值;2. 重置方法是一个抽象方法,需要子类去做实现),所以直接使用一个ReentrantLock锁来做并发控制。
  2. Thread.yield();方法的使用,这个方法主要的作用是交出CPU的执行权,并重新竞争CPU执行权。这个方法再我们业务代码中其实很少用到。

如何实现的滑动的

通过上面这个方法我们可以看到我们是如果根据当前时间获取到一个桶的(滑动窗口)。但是如何实现滑动效果的呢?实现滑动效果主要看上面那个方法的如何找到桶的下标和如何更加当前时间找到当前桶的开始时间,如下:

// 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
int idx = calculateTimeIdx(timeMillis);
// 根据当前时间计算出当前滑动窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
// 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 利用除法取整原则,保证了一秒内的所有时间搓得到的timeId是相等的
long timeId = timeMillis / windowLengthInMs;
// 利用求余运算原则,保证一秒内获取到的桶的下标位是一致的
return (int) (timeId % array.length());
}

// 根据当前时间计算出当前滑动窗口的开始时间
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// 利用求余运算原则,保证一秒内获取到的桶的开始时间是一致的
// 100 - 100 % 10 = 100 - 0 = 100
// 101 - 101 % 10 = 101 - 1 = 100
// 102 - 102 % 10 = 102 - 2 = 100
return timeMillis - timeMillis % windowLengthInMs;
}
  • timeMillis:表示当前时间的时间戳
  • windowLengthInMs:表示一个滑动窗口的时间长度,根据源码来看是1000ms即一个滑动窗口统计1秒内的数据。

这两个方法巧妙的利用了除法取整和求余原则实现了窗口的滑动。通过最上面的结构图我们可以发现滑动窗口会根据时间戳顺时针旋转。

桶的数量就决定了滑动窗口的统计时长,根据源码来看是60个桶,即一个统计1分钟内的数据。

内部是利用并发工具类LongAdder的特性来实现的高效的数据的统计。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  cas sentinel