Sentinel 的工作流程是围绕着一个个插槽所组成的插槽链来展开的,每个Slot 通过 Node 完成各个维度的数据统计。
Sentinel 中保存统计数据的对象有4种:
各个节点类关系如下:
StatisticNode作为 Node 接口的直接实现,为基础的统计节点,包含秒级、分钟级两个滑动窗口结构 ,代码如下:
public class StatisticNode implements Node {
// 分钟级滑动窗口,SampleCountProperty.SAMPLE_COUNT = 2,IntervalProperty.INTERVAL = 1000
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
// 秒级滑动窗口:保存最近 60 秒的统计信息
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
}
可以看出,不管是分钟级,还是秒级的滑动窗口,都是创建了一个 ArrayMetric
对象,该对象为 Sentinel 滑动窗口核心实现类,结果如下:
其中包括了几个 Sentinel 滑动窗口的几个核心类:
Metric
:定义指标收集APIArrayMetric
:滑动窗口核心实现类LeapArray
:滑动窗口顶层数据结构MetricBucket
:指标桶下面一一介绍各个核心类的功能及作用。
定义指标收集API,主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据,代码如下:。
public interface Metric extends DebugSupport {
// 获取 成功个数
long success();
// 获取 最大成功个数
long maxSuccess();
// 获取 异常个数
long exception();
// 获取 阻塞个数
long block();
// 获取 通过个数,不包括 occupiedPass
long pass();
// 获取 总响应时间
long rt();
// 获取 最小响应时间
long minRt();
// 获取所有资源的 MetricNode List
List<MetricNode> details();
// 获取满足timePredicate条件的资源 MetricNode List
List<MetricNode> detailsOnCondition(Predicate<Long> timePredicate);
// 获取 窗口数组
MetricBucket[] windows();
// add
void addException(int n);
void addBlock(int n);
void addSuccess(int n);
void addPass(int n);
void addRT(long rt);
// 以秒为单位获取滑动窗口长度
double getWindowIntervalInSec();
// 获取滑动窗口样本个数
int getSampleCount();
// 获取 timeMillis 时间内,有效窗口的通过个数
long getWindowPass(long timeMillis);
// 添加占用通行证,表示借用后一个窗口令牌的通行证请求。
void addOccupiedPass(int acquireCount);
// 添加funtureTime 时间窗口的占用通行证,表示借用funtureTime 窗口令牌的通行证请求。
void addWaiting(long futureTime, int acquireCount);
// 获取 总占用通行证个数
long waiting();
// 获取占用的通行证计数
long occupiedPass();
// 获取上一个窗口阻塞个数
long previousWindowBlock();
// 获取上一个窗口通过个数
long previousWindowPass();
}
实现 Metic
接口,为 Sentinel 滑动窗口核心实现类,通过该类,可以获取到通过数量、阻塞数量、异常数量、成功数量、响应时间等。
ArrayMetric
定义了三个构造方法,实现了 Metic
中所有接口,结构如下:
public class ArrayMetric implements Metric {
// LeapArray:滑动窗口顶层数据结构,包含一个一个的窗口数据。
// MetricBucket:指标桶
private final LeapArray<MetricBucket> data;
/**
* @param sampleCount 在一个采集间隔中抽样的个数
* @param intervalInMs 采集的时间间隔(毫秒)
*/
public ArrayMetric(int sampleCount, int intervalInMs) {
// 创建一个可占用的LeapArray
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
/**
* @param sampleCount 在一个采集间隔中抽样的个数
* @param intervalInMs 采集的时间间隔(毫秒)
* @param enableOccupy 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量
*/
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
// 创建一个可占用的LeapArray
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
// 创建一个普通的LeapArray
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
/**
* For unit test.
*/
public ArrayMetric(LeapArray<MetricBucket> array) {
this.data = array;
}
// 省略其他核心方法
}
其中构造函数的参数列表含义如下:
Sentinel 通过 ArrayMetric
进行通过数量获取/添加、阻塞数量获取/添加、异常数量获取/添加、成功数量获取/添加、响应时间获取/添加等。其核心逻辑如下(以success() 、addSuccess(int count)
) 为例:
public class ArrayMetric implements Metric {
// LeapArray:滑动窗口顶层数据结构,包含一个一个的窗口数据。
// MetricBucket:指标桶
// data = OccupiableBucketLeapArray / BucketLeapArray
private final LeapArray<MetricBucket> data;
@Override
public long success() {
// 创建当前时间的窗口
data.currentWindow();
long success = 0;
// 获取当前时间的所有 MetricBucket
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
// 获取当前 window 的 success 数,求和
success += window.success();
}
return success;
}
@Override
public void addSuccess(int count) {
// 创建当前时间的窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 添加 success 统计
wrap.value().addSuccess(count);
}
}
其中获取统计指标步骤如下:
进行指标统计步骤如下:
主要是是通过 LeapArray 提供的API来进行各个维度的统计、值的获取。
在讲解 LeapArray
之前,我们先讲一下 WindowWrap
对象。 WindowWrap
为 MetricBucket
的包装类,主要是对 MetricBucket
进行包装增强。结构如下:
public class WindowWrap<T> {
// 窗口时间(毫秒)
private final long windowLengthInMs;
// 窗口开始时间(毫秒)
private long windowStart;
// 统计数据 -> MetricBucket
private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public long windowLength() {
return windowLengthInMs;
}
public long windowStart() {
return windowStart;
}
public T value() {
return value;
}
public void setValue(T value) {
this.value = value;
}
// 重置当前 WindowWrap 的窗口开始时间(毫秒)
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
// 判断给定timeMillis 是否存在当前窗口
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
@Override
public String toString() {
return "WindowWrap{" +
"windowLengthInMs=" + windowLengthInMs +
", windowStart=" + windowStart +
", value=" + value +
'}';
}
}
指标桶,负责统计各个维度(正常通过、阻塞、异常、成功、响应时间等)的请求数量。
Sentinel 统计的维度有:
public enum MetricEvent {
PASS, // 正常通过
BLOCK, // 阻塞
EXCEPTION, // 异常
SUCCESS, // 成功
RT, // 响应时间
OCCUPIED_PASS // 在未来的窗口容量中正常通过(从 1.5.0 开始被占用)
}
MetricBucket 源码如下:
public class MetricBucket {
// 统计数组
private final LongAdder[] counters;
// 最小响应时间
private volatile long minRt;
public MetricBucket() {
// 获取所有统计维度
MetricEvent[] events = MetricEvent.values();
// 初始化统计数组,以 MetricEvent 值序号为下标
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
// 初始化最小响应时间
initMinRt();
}
// 重置当前 MetricBucket
public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
// 值0
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
// 初始化最小响应时间
initMinRt();
return this;
}
private void initMinRt() {
// 赋值为最大响应时间
this.minRt = SentinelConfig.statisticMaxRt();
}
// 重置当前 MetricBucket
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
// 值0
counters[event.ordinal()].reset();
}
// 初始化最小响应时间
initMinRt();
return this;
}
// 获取统计值
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
// 添加统计
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
public long occupiedPass() {
return get(MetricEvent.OCCUPIED_PASS);
}
public long block() {
return get(MetricEvent.BLOCK);
}
public long exception() {
return get(MetricEvent.EXCEPTION);
}
public long rt() {
return get(MetricEvent.RT);
}
public long minRt() {
return minRt;
}
public long success() {
return get(MetricEvent.SUCCESS);
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public void addOccupiedPass(int n) {
add(MetricEvent.OCCUPIED_PASS, n);
}
public void addException(int n) {
add(MetricEvent.EXCEPTION, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
public void addSuccess(int n) {
add(MetricEvent.SUCCESS, n);
}
public void addRT(long rt) {
add(MetricEvent.RT, rt);
// Not thread-safe, but it's okay.
if (rt < minRt) {
minRt = rt;
}
}
@Override
public String toString() {
return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass();
}
}
其中 LongAdder
为一个多线程并发统计的对象,核心原理就是:在多线程并发情况下,将线程竞争资源由一个,调整为多个(数组),以达到提高线程运行效率问题,最后统计整个数组值的总和。
滑动窗口顶层数据结构,包含一个一个的窗口数据。
下面我们先看看 LeapArray的类结构图:
可以看出,LeapArray
为一个抽象类,下面我们对 LeapArray
的属性、抽象想法以及核心方法一一分析。
int windowLengthInMs
:每个窗口的时间间隔(毫秒)。int sampleCount
:抽样个数,就一个统计时间间隔中包含的滑动窗口个数。在 intervalInMs 相同的情况下,sampleCount 越多,抽样的统计数据就越精确,相应的需要的内存也越多。int intervalInMs
:一个统计的时间间隔(毫秒)。double intervalInSecond
:一个统计的时间间隔(分钟)。AtomicReferenceArray> array
:滑动窗口的数组。ReentrantLock updateLock
:可重入锁,在进行窗口 reset 时使用。public abstract class LeapArray<T> {
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
public abstract class LeapArray<T> {
// 创建一个新的MetricBucket
public abstract T newEmptyBucket(long timeMillis);
// 重置Window
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
}
其中,LeapArray
有三个主要实现,分别如下:
关于子类的 newEmptyBucket
、resetWindowTo
方法,小伙伴有兴趣的可以自己看一下,整体实现,同名字关联。
计算并返回窗口在数组 array = new AtomicReferenceArray<>(sampleCount)
中的所属位置(下标)。
public abstract class LeapArray<T> {
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
}
计算并返回当前窗口的开始时间。
public abstract class LeapArray<T> {
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
}
获取 / 创建 timeMillis 时间下的 WindowWrap 对象。
public abstract class LeapArray<T> {
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算并返回窗口在数组 `array = new AtomicReferenceArray<>(sampleCount)` 中的所属位置(下标)。
int idx = calculateTimeIdx(timeMillis);
// 计算并返回当前窗口的开始时间。
long windowStart = calculateWindowStart(timeMillis);
/*
* 获取 WindowWrap
* (1) Bucket 不存在,则只需创建一个新的 Bucket 并通过 CAS 更新为循环数组值。
* (2) Bucket 是最新的,直接返回。
* (3) Bucket 被废弃,重置当前 Bucket。
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// Bucket 不存在,创建一个新的 Bucket
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 通过 CAS 更新为循环数组值
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
// CAS 失败,释放CPU资源
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// Bucket 是最新的,直接返回。
return old;
} else if (windowStart > old.windowStart()) {
// Bucket 被废弃,重置当前 Bucket
if (updateLock.tryLock()) { // 更新加锁
try {
// 重置
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// 获取锁失败,释放CPU资源
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
}
获取 timeMillis 的上一个窗口。
public abstract class LeapArray<T> {
public WindowWrap<T> getPreviousWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis - windowLengthInMs);
timeMillis = timeMillis - windowLengthInMs;
WindowWrap<T> wrap = array.get(idx);
// isWindowDeprecated:判断是否已经过期
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {
return null;
}
return wrap;
}
}
获取 timeMillis 下窗口的统计值。
public abstract class LeapArray<T> {
public T getWindowValue(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
WindowWrap<T> bucket = array.get(idx);
// 是否为当前窗口
if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
return null;
}
return bucket.value();
}
}
time 下的窗口是否已过期。
public abstract class LeapArray<T> {
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
}
public abstract class LeapArray<T> {
// 获取当前 time 的所有WindowWrap。
public List<WindowWrap<T>> list(long validTime) {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
// isWindowDeprecated:是否过期
if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {
continue;
}
result.add(windowWrap);
}
return result;
}
// 获取所有 WindowWrap
public List<WindowWrap<T>> listAll() {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null) {
continue;
}
result.add(windowWrap);
}
return result;
}
}
获取当前滑动窗口下所有没有过期的窗口的value值。
public abstract class LeapArray<T> {
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
}
获取 timeMillis 时间的下一个窗口。
public abstract class LeapArray<T> {
WindowWrap<T> getValidHead(long timeMillis) {
// Calculate index for expected head time.
int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
WindowWrap<T> wrap = array.get(idx);
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
return wrap;
}
}