Resilience4j
的CircuitBreaker
主要由6个部分组成:
CircuitBreakerRegistry
)CircuitBreakerConfig
)CircuitBreakerState
)CircuitBreakerMetrics
)CircuitBreakerEvent
)CircuitBreaker
)他们之间的调用关系如下图:
接下来将会详细介绍下各部分内容及他们之间的关系
源码如下:
public interface CircuitBrakerRegistry extends Registry<CircuitBreaker, CircuitBreakerConfig> {
// 根据自定义配置创建CircuitBreakerRegistry实例
static CircuitBreakerRegistry of(CircuitBreakerConfig circuitBreakerConfig) {
return new InMemoryCircuitBreakerRegistry(circuitBreakerConfig);
}
// ...
// 使用默认的配置 创建CircuitBreakerRegistry实例
static CircuitBreakerRegistry ofDefaults() {
return new InMemoryCircuitBreakerRegistry();
}
// ...
// 根据name获取CircuitBreaker
CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config);
// ...
}
public final class InMemoryCircuitBreakerRegistry extends
AbstractRegistry<CircuitBreaker, CircuitBreakerConfig> implements CircuitBreakerRegistry {
public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
super(defaultConfig);
}
// ...
}
public class AbstractRegistry<E, C> implements Registry<E, C> {
// CircuitBreaker 实例
protected final RegistryStore<E> entryMap;
// CircuitBreakerConfig
protected final ConcurrentMap<String, C> configurations;
// tags
protected final Map<String, String> registryTags;
private final RegistryEventProcessor eventProcessor;
}
// RegistryStore 默认实现为 InMemoryRegistryStore
// 主要作用就是利用ConcurrentHashMap来存储CircuitBreaker实例
public class InMemoryRegistryStore<E> implements RegistryStore<E> {
private final ConcurrentMap<String, E> entryMap;
public InMemoryRegistryStore() {
this.entryMap = new ConcurrentHashMap<>();
}
}
Predicate recordExceptionPredicate=throwable -> true;
判断是否是需要记录的异常,默认所有异常都是可记录的(会拿来计算失败率)Predicate ignoreExceptionPredicate = throwable -> false;
判断是否需要忽略异常,默认都不忽略Predicate
判断结果集是否是需要计算失败率的,默认都不算Class extends Throwable>[] recordExceptions = new Class[0]
需要记录的异常Class extends Throwable>[] ignoreExceptions = new Class[0]
需要忽略的异常float failureRateThreshold = 50
失败率阈值 默认50%int permittedNumberOfCallsInHalfOpenState = 10
断路器在半开状态下允许的成功执行的请求数,默认10int slidingWindowSize = 100
滑动窗口大小 默认100,如果窗口类型是基于计数的滑动窗口 那么该值得含义就是固定数组的大小存放每次请求的结果。如果窗口类型是基于时间的滑动窗口 那么该值得含义就是桶大小 例如10 就会有10个存储桶每个存储桶窗口大小是1s SlidingWindowType slidingWindowType =SlidingWindowType.COUNT_BASED
滑动窗口类型 默认是基于计数的COUNT_BASED 还有一种基于时间轮的 TIME_BASEDint minimumNumberOfCalls = 100
最小的请求或者调用数, 默认100, 如果调用数小于该值, 即使全部失败, 断路器也不会生效boolean writableStackTraceEnabled = true
启用可写的堆栈跟踪。设置为false时,Exception#getStackTrace返回长度为零的数组。当断路器打开时,这可以用来减少垃圾日志,因为已知异常的原boolean automaticTransitionFromOpenToHalfOpenEnabled = false
是否允许断路器自动从开启状态超时后转换为半开状态,如果开启,将会启动一个定时任务来转换状态IntervalFunction waitIntervalFunctionInOpenState
断路器打开状态下的持续时间,默认60sDuration slowCallDurationThreshold = Duration.ofSeconds(60s)
慢调用的时间阈值, 如果执行时间大于该值则认为是慢调用, 默认60s, 慢调用到一定比例将会触发熔断float slowCallRateThreshold = 100
慢调用的阈值,百分比,默认100% 也就是所有的请求都比slowCallDurationThreshold
大,就触发熔断Duration maxWaitDurationInHalfOpenState = Duration.ofSeconds(0)
半开状态下的最大持续时间,达到该时间就自动转换为打开状态,默认为0 则是一直保持半开状态,直到 minimumNumberOfCalls
成功或者失败,如果大于等于1ms则会启动个定时任务来处理状态转换自动转换为开启状态CircuitBreaker
目前共用六种状态,三种常用状态:关闭 (CLOSED
)、打开(OPEN
)、半开(HALF_OPEN
),三种特殊状态:禁用(DISABLE
)、强制打开(FORCED_OPEN
)、仅监控(METRICS_ONLY
)
状态转换图如下:
CLOSED
状态下,当失败率、慢调用率(参见CircuitBreakerConfig#failureRateThreshold#slowCallRateThreshold
)超过阈值时,断路器状态由CLOSED
转换为OPEN
,默认失败率50%,慢调用率100%OPEN
状态下,当打开状态的持续时间(参见CircuitBreakerConfig#waitIntervalFunctionInOpenState
)结束,断路器由OPEN
转换为HALF_OPEN
HALF_OPEN
状态下,此时有一部分请求可以执行(参见CircuitBreakerConfig#permittedNumberOfCallsInHalfOpenState
),如果这些请求的失败率超过阈值时,断路器的状态将会由HALF_OPEN
转换为OPEN
,如果这些请求的失败率小于阈值,断路器的状态将会由HALF_OPEN
转换为CLOSED
CircuitBreakerStateMachine
CircuitBreaker
的各种状态之间转换是通过一个有限状态机CircuitBreakerStateMachine
来实现的。
CircuitBreakerStateMachine
类实现了CircuitBreaker
接口,除了实现状态转换,还实现了熔断机制和事件发布机制。
CircuitBreaker.State
定义了状态的枚举值,也定义了每种状态是否允许发布事件CricuitBreakerStateMachine
利用AtomicReference
保证CircuitBreakerState
引用的原子性, 初始化状态为关闭状态stateTransition
主要作用就是将A状态转换为B状态public final class CircuitBreakerStateMachine implements CircuitBreaker {
// 保证状态的原子性,初始状态为关闭状态
private final AtomicReference<CircuitBreakerState> stateReference;
private CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig,
Clock clock, SchedulerFactory schedulerFactory,
io.vavr.collection.Map<String, String> tags) {
// ...省略
// 初始值为关闭状态
this.stateReference = new AtomicReference<>(new ClosedState());
// ...
}
// 状态转换核心方法
private void stateTransition(State newState,
UnaryOperator<CircuitBreakerState> newStateGenerator) {
// 先获取当前State然后执行状态更新方法,AtomicReference保证原子性
CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
// StateTransition 定义了可以从 A状态到B状态的枚举, 如果不存在该case,transitionBetween方法将会抛出异常
StateTransition.transitionBetween(getName(), currentState.getState(), newState);
currentState.preTransitionHook();
return newStateGenerator.apply(currentState);
});
publishStateTransitionEvent(
StateTransition.transitionBetween(getName(), previousState.getState(), newState));
}
@Override // 转为DISABLED
public void transitionToDisabledState() {
stateTransition(DISABLED, currentState -> new DisabledState());
}
@Override // 转为METRICS_ONLY
public void transitionToMetricsOnlyState() {
stateTransition(METRICS_ONLY, currentState -> new MetricsOnlyState());
}
@Override // 转为FORCE_OPEN
public void transitionToForcedOpenState() {
stateTransition(FORCED_OPEN,
currentState -> new ForcedOpenState(currentState.attempts() + 1));
}
@Override // 转为CLOSED
public void transitionToClosedState() {
stateTransition(CLOSED, currentState -> new ClosedState());
}
@Override // 转为OPEN
public void transitionToOpenState() {
stateTransition(OPEN,
currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));
}
@Override // 转为HALF_OPEN
public void transitionToHalfOpenState() {
stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));
}
}
在这,我列举几个StateTransition
enum StateTransition {
// CLOSED->CLOSED
CLOSED_TO_CLOSED(State.CLOSED, State.CLOSED),
// CLOSED->OPEN
CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
CLOSED_TO_METRICS_ONLY(State.CLOSED, State.METRICS_ONLY),
CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
HALF_OPEN_TO_HALF_OPEN(State.HALF_OPEN, State.HALF_OPEN),
HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
HALF_OPEN_TO_METRICS_ONLY(State.HALF_OPEN, State.METRICS_ONLY),
HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
OPEN_TO_OPEN(State.OPEN, State.OPEN),
OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
OPEN_TO_METRICS_ONLY(State.OPEN, State.METRICS_ONLY),
OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
// 省略.....
private final State fromState;
private final State toState;
StateTransition(State fromState, State toState) {
this.fromState = fromState;
this.toState = toState;
}
}
首先,我们需要提前了解下我们调用结果也就是请求结果,总共有哪几种类型
enum Result {
BELOW_THRESHOLDS, // 都低于阈值
FAILURE_RATE_ABOVE_THRESHOLDS, // 失败率 高于阈值
SLOW_CALL_RATE_ABOVE_THRESHOLDS, // 慢调用率 高于阈值
ABOVE_THRESHOLDS, // 失败率、慢调用率 都高于阈值
BELOW_MINIMUM_CALLS_THRESHOLD; // 总请求数低于最小请求数阈值
public static boolean hasExceededThresholds(Result result) {
return hasFailureRateExceededThreshold(result) ||
hasSlowCallRateExceededThreshold(result);
}
public static boolean hasFailureRateExceededThreshold(Result result) {
return result == ABOVE_THRESHOLDS || result == FAILURE_RATE_ABOVE_THRESHOLDS;
}
public static boolean hasSlowCallRateExceededThreshold(Result result) {
return result == ABOVE_THRESHOLDS || result == SLOW_CALL_RATE_ABOVE_THRESHOLDS;
}
}
CircuitBreakerMetrics
实现了 CircuitBreaker.Metrics
接口
// CircuitBreaker 指标
interface Metrics {
float getFailureRate();// 返回失败率,如果总共请求数小于最小设定请求数 返回-1
float getSlowCallRate();// 返回慢调用率
int getNumberOfSlowCalls();// 返回慢调用数
int getNumberOfSlowSuccessfulCalls();// 慢调用成功数
int getNumberOfSlowFailedCalls();// 慢调用失败数
int getNumberOfBufferedCalls();// 返回当前总请求数
int getNumberOfFailedCalls();// 失败调用数
long getNumberOfNotPermittedCalls();// 在打开状态下,返回当前不被允许请求调用的数量, 在关闭和半开下 始终为0
int getNumberOfSuccessfulCalls();// 返回当前成功请求调用的数量
}
class CircuitBreakerMetrics implements CircuitBreaker.Metrics {
// 指标类 后续会讲
private final Metrics metrics;
// 失败率阈值
private final float failureRateThreshold;
// 慢调用阈值
private final float slowCallRateThreshold;
// 慢调用时间阈值 (纳秒级)
private final long slowCallDurationThresholdInNanos;
// 没执行数
private final LongAdder numberOfNotPermittedCalls;
private int minimumNumberOfCalls;
/**
* @param slidingWindowSize 滑动窗口大小,CircuitBreakerConfig 中配置
* @param slidingWindowType 滑动窗口类型 有基于计数 和基于时间两种 默认 基于计数
* @param circuitBreakerConfig config
* @param clock 滑动窗口 为基于时间是可用 取值为 CircuitBreakerState中的clock
*/
private CircuitBreakerMetrics(int slidingWindowSize,
CircuitBreakerConfig.SlidingWindowType slidingWindowType,
CircuitBreakerConfig circuitBreakerConfig,
Clock clock) {
if (slidingWindowType == CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) {
this.metrics = new FixedSizeSlidingWindowMetrics(slidingWindowSize);
// 最小请求数 不能超过滑动窗口大小
this.minimumNumberOfCalls = Math
.min(circuitBreakerConfig.getMinimumNumberOfCalls(), slidingWindowSize);
} else {
this.metrics = new SlidingTimeWindowMetrics(slidingWindowSize, clock);
this.minimumNumberOfCalls = circuitBreakerConfig.getMinimumNumberOfCalls();
}
this.failureRateThreshold = circuitBreakerConfig.getFailureRateThreshold();
this.slowCallRateThreshold = circuitBreakerConfig.getSlowCallRateThreshold();
this.slowCallDurationThresholdInNanos = circuitBreakerConfig.getSlowCallDurationThreshold()
.toNanos();
this.numberOfNotPermittedCalls = new LongAdder();
}
// 调用成功
public Result onSuccess(long duration, TimeUnit durationUnit) {
Snapshot snapshot;
// 超过慢调用时间阈值 及算做 慢成功 否则成功
if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_SUCCESS);
} else {
snapshot = metrics.record(duration, durationUnit, Outcome.SUCCESS);
}
// 校验失败率或者慢调用率是否超过了阈值
return checkIfThresholdsExceeded(snapshot);
}
// 调用失败
public Result onError(long duration, TimeUnit durationUnit) {
Snapshot snapshot;
// 超过慢调用时间阈值 及算做 慢失败 否则失败
if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);
} else {
snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);
}
// 校验失败率或者慢调用率是否超过了阈值
return checkIfThresholdsExceeded(snapshot);
}
// 校验失败率或者慢调用率是否超过了阈值
private Result checkIfThresholdsExceeded(Snapshot snapshot) {
float failureRateInPercentage = getFailureRate(snapshot);
float slowCallsInPercentage = getSlowCallRate(snapshot);
// -1 表示 总请求数低于最小请求阈值 默认始终不熔断
if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {
return Result.BELOW_MINIMUM_CALLS_THRESHOLD;
}
// 失败和慢调用 阈值都满足
if (failureRateInPercentage >= failureRateThreshold
&& slowCallsInPercentage >= slowCallRateThreshold) {
return Result.ABOVE_THRESHOLDS;
}
if (failureRateInPercentage >= failureRateThreshold) {
return Result.FAILURE_RATE_ABOVE_THRESHOLDS;
}
if (slowCallsInPercentage >= slowCallRateThreshold) {
return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;
}
return Result.BELOW_THRESHOLDS;
}
private float getSlowCallRate(Snapshot snapshot) {
int bufferedCalls = snapshot.getTotalNumberOfCalls();
// 未达到最小请求数 返回-1
if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {
return -1.0f;
}
return snapshot.getSlowCallRate();
}
private float getFailureRate(Snapshot snapshot) {
int bufferedCalls = snapshot.getTotalNumberOfCalls();
// 未达到最小请求数 返回-1
if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {
return -1.0f;
}
return snapshot.getFailureRate();
}
}
可以看到指标的 统计都是基于io.github.resilience4j.core.metrics.Metrics
类,他有两种实现,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口
public interface Metrics {
/**
* 负责记录一次请求的信息
* @param duration the duration of the call 执行时间
* @param durationUnit the time unit of the duration 时间单位
* @param outcome the outcome of the call 执行结果
*/
Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome);
/**
* 获取当前度量指标快照信息
*/
Snapshot getSnapshot();
enum Outcome {
SUCCESS, ERROR, SLOW_SUCCESS, SLOW_ERROR
}
}
上面提到过,CircuitBreaker 总共有两种滑动窗口算法,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口,接下来将会详细介绍下这两种算法,不过在此之前,容我先介绍几个类
epochSecond
负责记录当前的秒级别数,reset 方法依旧是重置所有数据io.github.resilience4j.core.metrics.FixedSizeSlidingWindowMetrics
基于计数的滑动窗口由N个测量值的圆形数组实现。
如果计数窗口大小为10,则圆形阵列始终具有10个测量值。
滑动窗口以增量方式更新总聚合。当记录新的呼叫结果时,将更新总汇总。收回最旧的度量后,将从总聚合中减去该度量,然后重置存储桶。(逐项扣除)
快照的获取时间为常数O(1),因为快照是预先聚合的,并且与窗口大小无关。
此实现的空间需求(内存消耗)应为O(n)
public class FixedSizeSlidingWindowMetrics implements Metrics {
// 滑动窗口大小
private final int windowSize;
// 指标聚合信息 包含 总执行时间,慢调用数,慢调用失败数,失败数,调用数
private final TotalAggregation totalAggregation;
// 指标数组
private final Measurement[] measurements;
// 当前滑动窗口位置
int headIndex;
public FixedSizeSlidingWindowMetrics(int windowSize) {
this.windowSize = windowSize;
this.measurements = new Measurement[this.windowSize];
this.headIndex = 0;
// 初始化指标数组
for (int i = 0; i < this.windowSize; i++) {
measurements[i] = new Measurement();
}
this.totalAggregation = new TotalAggregation();
}
// 加锁 保证线程安全 record 就是调用结束后 回调的
@Override
public synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {
totalAggregation.record(duration, durationUnit, outcome);
moveWindowByOne().record(duration, durationUnit, outcome);
return new SnapshotImpl(totalAggregation);
}
// 加锁 保证线程安全
public synchronized Snapshot getSnapshot() {
return new SnapshotImpl(totalAggregation);
}
private Measurement moveWindowByOne() {
// 后移当前滑动窗口位置
moveHeadIndexByOne();
// 获取旧指标信息
Measurement latestMeasurement = getLatestMeasurement();
// 从总聚合中减去旧指标
totalAggregation.removeBucket(latestMeasurement);
// 重置当前指标
latestMeasurement.reset();
return latestMeasurement;
}
private Measurement getLatestMeasurement() {
return measurements[headIndex];
}
// 后移当前滑动窗口位置
void moveHeadIndexByOne() {
this.headIndex = (headIndex + 1) % windowSize;
}
}
io.github.resilience4j.core.metrics.SlidingTimeWindowMetrics
基于时间的滑动窗口是由N个部分集合(存储桶)的圆形数组实现的。
如果时间窗口大小为10秒,则圆形数组始终具有10个部分聚合(存储桶)。每个存储桶都会汇总在某个时期内发生的所有调用的结果。(部分聚集)。圆形数组的头存储区存储当前纪元的呼叫结果。其他部分聚合存储前几秒的呼叫结果。
滑动窗口不会单独存储呼叫结果(元组),而是以增量方式更新部分聚合(存储桶)和总聚合。
当记录新的呼叫结果时,总聚合将增量更新。当最旧的存储桶被收回时,该存储桶的部分总聚合将从总聚合中减去,然后重置该存储桶。(逐项扣除)
快照的获取时间为常数O(1),因为快照是预先聚合的,并且与时间窗口的大小无关。
此实现的空间需求(内存消耗)应接近常数O(n),因为调用结果(元组)不是单独存储的。仅创建N个部分聚合和1个总聚合。
部分聚合由3个整数组成,以计算失败的呼叫数,慢速呼叫数和总呼叫数。一个长存储所有呼叫的总持续时间。
Resilience4j的事件机制采用的是观察者设计模式,核心类在io.github.resilience4j.core
包下
EventConsumer
事件接口消费者(观察者)EventPublisher
事件接口发布者(被观察者)EventProcessor
实现了EventPublisher
接口,封装了 消费者EventConsumer
注册接口和负责处理对应事件通知CircuitBreakerEvent 事件类型
目前共有八种:
CircuitBreakerOnSuccessEvent
请求调用成功时发布的事件CircuitBreakerOnErrorEvent
请求调用失败时发布的事件CircuitBreakerOnResetEvent
断路器重置时发布的事件CircuitBreakerOnFailureRateExceededEvent
当达到失败率时发布的事件CircuitBreakerOnIgnoredErrorEvent
当忽略的调用异常发生时发布的事件CircuitBreakerOnSlowCallRateExceededEvent
当达到慢调用率时发布的事件CircuitBreakerOnCallNotPermittedEvent
当请求不被允许调用时发布的事件CircuitBreakerOnStateTransitionEvent
断路器状态转换时发布的事件对应枚举可以参考CircuitBreakerEvent.Type
enum Type {
ERROR(false),
IGNORED_ERROR(false),
SUCCESS(false),
NOT_PERMITTED(false),
STATE_TRANSITION(true),
RESET(true),
FORCED_OPEN(false),
DISABLED(false),
FAILURE_RATE_EXCEEDED(false),
SLOW_CALL_RATE_EXCEEDED(false);
public final boolean forcePublish;
Type(boolean forcePublish) {
this.forcePublish = forcePublish;
}
}
EventPublisher 中定义了八种注册事件处理方法
interface EventPublisher extends
io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {
EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);
EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);
EventPublisher onStateTransition(
EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);
EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);
EventPublisher onIgnoredError(
EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);
EventPublisher onCallNotPermitted(
EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
EventPublisher onFailureRateExceeded(
EventConsumer<CircuitBreakerOnFailureRateExceededEvent> eventConsumer);
EventPublisher onSlowCallRateExceeded(
EventConsumer<CircuitBreakerOnSlowCallRateExceededEvent> eventConsumer);
}
最后 我们再回到我们的主角 CircuitBreaker
来瞅瞅。
注:该图省略了很多方法,具体使用什么请具体分析,这里只列举了一些常用的,用于分析;State、Metrics、StateTransition、EventPublisher 在前面都有提到。具体往前面翻翻吧
public interface CircuitBreaker {
static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker,
CheckedFunction0<T> supplier) {
return () -> {
// 尝试获取执行权限 如果没权限会抛出 CallNotPermittedException 异常 (此时处于熔断状态)
circuitBreaker.acquirePermission();
final long start = circuitBreaker.getCurrentTimestamp();
try {
// 执行
T result = supplier.apply();
long duration = circuitBreaker.getCurrentTimestamp() - start;
// 执行成功后 则记录状态。内部会根据config配置的 recordResultPredicate 检测 result 是否是需要统计失败率
circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
return result;
} catch (Exception exception) {
// Do not handle java.lang.Error
long duration = circuitBreaker.getCurrentTimestamp() - start;
// 异常状态下,记录失败状态,计算失败率
circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);
throw exception;
}
};
}
default <T> T executeCheckedSupplier(CheckedFunction0<T> checkedSupplier) throws Throwable {
return decorateCheckedSupplier(this, checkedSupplier).apply();
}
// 这里负责创建一个CircuitBreaker 实例
static CircuitBreaker of(String name, CircuitBreakerConfig circuitBreakerConfig) {
// 熟悉吗
return new CircuitBreakerStateMachine(name, circuitBreakerConfig);
}
// 接下来是几个权限接口
boolean tryAcquirePermission();
// 和tryAcquirePermission区别不同在于 没权限 该方法会抛出 CallNotPermittedException 异常
void acquirePermission();
// 释放权限
void releasePermission();
// decorateCheckedSupplier 方法中会调这三个方法哦
void onSuccess(long duration, TimeUnit durationUnit);
void onError(long duration, TimeUnit durationUnit, Throwable throwable);
void onResult(long duration, TimeUnit durationUnit, Object result);
// 该方法负责将状态重置为CLOSED状态
void reset();
}
接下来分析下 CircuitBreakerStateMachine
中的几个方法
public final class CircuitBreakerStateMachine implements CircuitBreaker {
// 保证状态的原子性,初始状态为关闭状态
private final AtomicReference<CircuitBreakerState> stateReference;
// xxxPermission方法就没必要分析了 都是调用 CircuitBreakerState的方法
public boolean tryAcquirePermission() {
boolean callPermitted = stateReference.get().tryAcquirePermission();
if (!callPermitted) {
publishCallNotPermittedEvent();
}
return callPermitted;
}
public void releasePermission() {
stateReference.get().releasePermission();
}
public void acquirePermission() {
try {
stateReference.get().acquirePermission();
} catch (Exception e) {
publishCallNotPermittedEvent();
throw e;
}
}
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
Throwable cause = throwable.getCause();
handleThrowable(duration, durationUnit, cause);
} else {
handleThrowable(duration, durationUnit, throwable);
}
}
private void handleThrowable(long duration, TimeUnit durationUnit, Throwable throwable) {
if (circuitBreakerConfig.getIgnoreExceptionPredicate().test(throwable)) {
releasePermission();
publishCircuitIgnoredErrorEvent(name, duration, durationUnit, throwable);
} else if (circuitBreakerConfig.getRecordExceptionPredicate().test(throwable)) {
publishCircuitErrorEvent(name, duration, durationUnit, throwable);
stateReference.get().onError(duration, durationUnit, throwable);
} else {
publishSuccessEvent(duration, durationUnit);
stateReference.get().onSuccess(duration, durationUnit);
}
}
public void onSuccess(long duration, TimeUnit durationUnit) {
publishSuccessEvent(duration, durationUnit);
stateReference.get().onSuccess(duration, durationUnit);
}
public void onResult(long duration, TimeUnit durationUnit, @Nullable Object result) {
if (result != null && circuitBreakerConfig.getRecordResultPredicate().test(result)) {
ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);
publishCircuitErrorEvent(name, duration, durationUnit, failure);
stateReference.get().onError(duration, durationUnit, failure);
} else {
onSuccess(duration, durationUnit);
}
}
}