• Resilience4j.Circuitbreaker源码分析


    Resilience4j.Circuitbreaker

    源码分析

    Resilience4jCircuitBreaker主要由6个部分组成:

    • 管理断路器实例的注册容器(CircuitBreakerRegistry)
    • 断路器的相关配置(CircuitBreakerConfig)
    • 断路器的各种状态(CircuitBreakerState)
    • 触发断路器状态变化的指标(CircuitBreakerMetrics)
    • 断路器行为变化产生的事件(CircuitBreakerEvent)
    • 断路器本身(CircuitBreaker)

    CircuitBreaker六部分

    他们之间的调用关系如下图:

    image-20220914181931953

    接下来将会详细介绍下各部分内容及他们之间的关系

    CircuitBreakerRegistry 断路器容器

    image-20220914202425795

    源码如下:

    public interface CircuitBrakerRegistry extends Registry<CircuitBreaker, CircuitBreakerConfig> {
        // 根据自定义配置创建CircuitBreakerRegistry实例
        static CircuitBreakerRegistry of(CircuitBreakerConfig circuitBreakerConfig) {
            return new InMemoryCircuitBreakerRegistry(circuitBreakerConfig);
        }
        // ...
        // 使用默认的配置 创建CircuitBreakerRegistry实例
        static CircuitBreakerRegistry ofDefaults() {
            return new InMemoryCircuitBreakerRegistry();
        }
        // ...
        // 根据name获取CircuitBreaker
        CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config);
        // ...
    }
    public final class InMemoryCircuitBreakerRegistry extends
        AbstractRegistry<CircuitBreaker, CircuitBreakerConfig> implements CircuitBreakerRegistry {
            public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
            super(defaultConfig);
        }
        // ... 
    }
    
    public class AbstractRegistry<E, C> implements Registry<E, C> {
        // CircuitBreaker 实例
        protected final RegistryStore<E> entryMap;
        // CircuitBreakerConfig
        protected final ConcurrentMap<String, C> configurations;
        // tags
        protected final Map<String, String> registryTags;
        private final RegistryEventProcessor eventProcessor;
    }
    
    // RegistryStore 默认实现为 InMemoryRegistryStore
    // 主要作用就是利用ConcurrentHashMap来存储CircuitBreaker实例
    public class InMemoryRegistryStore<E> implements RegistryStore<E> {
        private final ConcurrentMap<String, E> entryMap;
        public InMemoryRegistryStore() {
            this.entryMap = new ConcurrentHashMap<>();
        }
    }
    

    CircuitBreakerConfig 断路器的相关配置

    • Predicate recordExceptionPredicate=throwable -> true; 判断是否是需要记录的异常,默认所有异常都是可记录的(会拿来计算失败率)
    • Predicate ignoreExceptionPredicate = throwable -> false; 判断是否需要忽略异常,默认都不忽略
    • Predicate recordResultPredicate = object -> false 判断结果集是否是需要计算失败率的,默认都不算
    • Class[] recordExceptions = new Class[0]需要记录的异常
    • Class[] ignoreExceptions = new Class[0] 需要忽略的异常
    • float failureRateThreshold = 50 失败率阈值 默认50%
    • int permittedNumberOfCallsInHalfOpenState = 10 断路器在半开状态下允许的成功执行的请求数,默认10
    • int slidingWindowSize = 100 滑动窗口大小 默认100,如果窗口类型是基于计数的滑动窗口 那么该值得含义就是固定数组的大小存放每次请求的结果。如果窗口类型是基于时间的滑动窗口 那么该值得含义就是桶大小 例如10 就会有10个存储桶每个存储桶窗口大小是1s
    • SlidingWindowType slidingWindowType =SlidingWindowType.COUNT_BASED滑动窗口类型 默认是基于计数的COUNT_BASED 还有一种基于时间轮的 TIME_BASED
    • int minimumNumberOfCalls = 100最小的请求或者调用数, 默认100, 如果调用数小于该值, 即使全部失败, 断路器也不会生效
    • boolean writableStackTraceEnabled = true启用可写的堆栈跟踪。设置为false时,Exception#getStackTrace返回长度为零的数组。当断路器打开时,这可以用来减少垃圾日志,因为已知异常的原
    • boolean automaticTransitionFromOpenToHalfOpenEnabled = false是否允许断路器自动从开启状态超时后转换为半开状态,如果开启,将会启动一个定时任务来转换状态
    • IntervalFunction waitIntervalFunctionInOpenState 断路器打开状态下的持续时间,默认60s
    • Duration slowCallDurationThreshold = Duration.ofSeconds(60s) 慢调用的时间阈值, 如果执行时间大于该值则认为是慢调用, 默认60s, 慢调用到一定比例将会触发熔断
    • float slowCallRateThreshold = 100慢调用的阈值,百分比,默认100% 也就是所有的请求都比slowCallDurationThreshold大,就触发熔断
    • Duration maxWaitDurationInHalfOpenState = Duration.ofSeconds(0) 半开状态下的最大持续时间,达到该时间就自动转换为打开状态,默认为0 则是一直保持半开状态,直到 minimumNumberOfCalls成功或者失败,如果大于等于1ms则会启动个定时任务来处理状态转换自动转换为开启状态
    • CircuitBreakerState(断路器状态)

      image-20220915165043210

      六种状态

      CircuitBreaker目前共用六种状态,三种常用状态:关闭 (CLOSED)、打开(OPEN)、半开(HALF_OPEN),三种特殊状态:禁用(DISABLE)、强制打开(FORCED_OPEN)、仅监控(METRICS_ONLY)

      image-20220914213828370

      状态转换图如下:

      image-20220914213851609

      • CLOSED状态下,当失败率、慢调用率(参见CircuitBreakerConfig#failureRateThreshold#slowCallRateThreshold)超过阈值时,断路器状态由CLOSED转换为OPEN,默认失败率50%,慢调用率100%
      • OPEN状态下,当打开状态的持续时间(参见CircuitBreakerConfig#waitIntervalFunctionInOpenState )结束,断路器由OPEN转换为HALF_OPEN
      • HALF_OPEN状态下,此时有一部分请求可以执行(参见CircuitBreakerConfig#permittedNumberOfCallsInHalfOpenState),如果这些请求的失败率超过阈值时,断路器的状态将会由HALF_OPEN转换为OPEN,如果这些请求的失败率小于阈值,断路器的状态将会由HALF_OPEN转换为CLOSED

      有限状态机实现 CircuitBreakerStateMachine

      CircuitBreaker的各种状态之间转换是通过一个有限状态机CircuitBreakerStateMachine来实现的。

      CircuitBreakerStateMachine类实现了CircuitBreaker接口,除了实现状态转换,还实现了熔断机制和事件发布机制。

      1. CircuitBreaker.State定义了状态的枚举值,也定义了每种状态是否允许发布事件
      2. CricuitBreakerStateMachine利用AtomicReference保证CircuitBreakerState引用的原子性, 初始化状态为关闭状态
      3. 状态转移核心方法stateTransition主要作用就是将A状态转换为B状态
      public final class CircuitBreakerStateMachine implements CircuitBreaker {
          // 保证状态的原子性,初始状态为关闭状态
          private final AtomicReference<CircuitBreakerState> stateReference;
          private CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig, 
                                             Clock clock, SchedulerFactory schedulerFactory, 
                                             io.vavr.collection.Map<String, String> tags) {
              // ...省略
              // 初始值为关闭状态
          	this.stateReference = new AtomicReference<>(new ClosedState());
              // ...
          }
          // 状态转换核心方法
          private void stateTransition(State newState,
              UnaryOperator<CircuitBreakerState> newStateGenerator) {
              // 先获取当前State然后执行状态更新方法,AtomicReference保证原子性
              CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
                  // StateTransition 定义了可以从 A状态到B状态的枚举, 如果不存在该case,transitionBetween方法将会抛出异常
                  StateTransition.transitionBetween(getName(), currentState.getState(), newState);
                  currentState.preTransitionHook();
                  return newStateGenerator.apply(currentState);
              });
              publishStateTransitionEvent(
                  StateTransition.transitionBetween(getName(), previousState.getState(), newState));
          }
          @Override // 转为DISABLED
          public void transitionToDisabledState() {
              stateTransition(DISABLED, currentState -> new DisabledState());
          }
          @Override // 转为METRICS_ONLY
          public void transitionToMetricsOnlyState() {
              stateTransition(METRICS_ONLY, currentState -> new MetricsOnlyState());
          }
          @Override // 转为FORCE_OPEN
          public void transitionToForcedOpenState() {
              stateTransition(FORCED_OPEN,
                  currentState -> new ForcedOpenState(currentState.attempts() + 1));
          }
          @Override // 转为CLOSED
          public void transitionToClosedState() {
              stateTransition(CLOSED, currentState -> new ClosedState());
          }
          @Override // 转为OPEN
          public void transitionToOpenState() {
              stateTransition(OPEN,
                  currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));
          }
      
          @Override // 转为HALF_OPEN
          public void transitionToHalfOpenState() {
              stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));
          }
      }
      

      在这,我列举几个StateTransition

      enum StateTransition {
          // CLOSED->CLOSED
          CLOSED_TO_CLOSED(State.CLOSED, State.CLOSED),
          // CLOSED->OPEN
          CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
          CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
          CLOSED_TO_METRICS_ONLY(State.CLOSED, State.METRICS_ONLY),
          CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
          HALF_OPEN_TO_HALF_OPEN(State.HALF_OPEN, State.HALF_OPEN),
          HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
          HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
          HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
          HALF_OPEN_TO_METRICS_ONLY(State.HALF_OPEN, State.METRICS_ONLY),
          HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
          OPEN_TO_OPEN(State.OPEN, State.OPEN),
          OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
          OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
          OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
          OPEN_TO_METRICS_ONLY(State.OPEN, State.METRICS_ONLY),
          OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
      	// 省略.....
          private final State fromState;
          private final State toState;
          StateTransition(State fromState, State toState) {
              this.fromState = fromState;
              this.toState = toState;
          }
      }    
      

      CircuitBreakerMetrics

      首先,我们需要提前了解下我们调用结果也就是请求结果,总共有哪几种类型

      enum Result {
          BELOW_THRESHOLDS, // 都低于阈值
          FAILURE_RATE_ABOVE_THRESHOLDS, // 失败率 高于阈值
          SLOW_CALL_RATE_ABOVE_THRESHOLDS, // 慢调用率 高于阈值
          ABOVE_THRESHOLDS, // 失败率、慢调用率 都高于阈值
          BELOW_MINIMUM_CALLS_THRESHOLD; // 总请求数低于最小请求数阈值
      
          public static boolean hasExceededThresholds(Result result) {
              return hasFailureRateExceededThreshold(result) ||
                  hasSlowCallRateExceededThreshold(result);
          }
      
          public static boolean hasFailureRateExceededThreshold(Result result) {
              return result == ABOVE_THRESHOLDS || result == FAILURE_RATE_ABOVE_THRESHOLDS;
          }
      
          public static boolean hasSlowCallRateExceededThreshold(Result result) {
              return result == ABOVE_THRESHOLDS || result == SLOW_CALL_RATE_ABOVE_THRESHOLDS;
          }
      }
      

      CircuitBreakerMetrics 实现了 CircuitBreaker.Metrics 接口

      // CircuitBreaker 指标
      interface Metrics {
          float getFailureRate();// 返回失败率,如果总共请求数小于最小设定请求数 返回-1
          float getSlowCallRate();// 返回慢调用率
          int getNumberOfSlowCalls();// 返回慢调用数
          int getNumberOfSlowSuccessfulCalls();// 慢调用成功数
          int getNumberOfSlowFailedCalls();// 慢调用失败数
          int getNumberOfBufferedCalls();// 返回当前总请求数
          int getNumberOfFailedCalls();// 失败调用数
          long getNumberOfNotPermittedCalls();// 在打开状态下,返回当前不被允许请求调用的数量, 在关闭和半开下 始终为0
          int getNumberOfSuccessfulCalls();// 返回当前成功请求调用的数量
      }
      
      class CircuitBreakerMetrics implements CircuitBreaker.Metrics {
          // 指标类 后续会讲
          private final Metrics metrics;
          // 失败率阈值
          private final float failureRateThreshold;
          // 慢调用阈值
          private final float slowCallRateThreshold;
          // 慢调用时间阈值 (纳秒级)
          private final long slowCallDurationThresholdInNanos;
          // 没执行数
          private final LongAdder numberOfNotPermittedCalls;
          private int minimumNumberOfCalls;
      
          /**
           * @param slidingWindowSize 滑动窗口大小,CircuitBreakerConfig 中配置
           * @param slidingWindowType 滑动窗口类型 有基于计数 和基于时间两种 默认 基于计数
           * @param circuitBreakerConfig config
           * @param clock 滑动窗口 为基于时间是可用 取值为 CircuitBreakerState中的clock
           */
          private CircuitBreakerMetrics(int slidingWindowSize,
              CircuitBreakerConfig.SlidingWindowType slidingWindowType,
              CircuitBreakerConfig circuitBreakerConfig,
              Clock clock) {
              if (slidingWindowType == CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) {
                  this.metrics = new FixedSizeSlidingWindowMetrics(slidingWindowSize);
                  // 最小请求数 不能超过滑动窗口大小
                  this.minimumNumberOfCalls = Math
                      .min(circuitBreakerConfig.getMinimumNumberOfCalls(), slidingWindowSize);
              } else {
                  this.metrics = new SlidingTimeWindowMetrics(slidingWindowSize, clock);
                  this.minimumNumberOfCalls = circuitBreakerConfig.getMinimumNumberOfCalls();
              }
              this.failureRateThreshold = circuitBreakerConfig.getFailureRateThreshold();
              this.slowCallRateThreshold = circuitBreakerConfig.getSlowCallRateThreshold();
              this.slowCallDurationThresholdInNanos = circuitBreakerConfig.getSlowCallDurationThreshold()
                  .toNanos();
              this.numberOfNotPermittedCalls = new LongAdder();
          }
          // 调用成功
          public Result onSuccess(long duration, TimeUnit durationUnit) {
              Snapshot snapshot;
              // 超过慢调用时间阈值 及算做 慢成功 否则成功
              if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
                  snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_SUCCESS);
              } else {
                  snapshot = metrics.record(duration, durationUnit, Outcome.SUCCESS);
              }
              // 校验失败率或者慢调用率是否超过了阈值
              return checkIfThresholdsExceeded(snapshot);
          }
      
          // 调用失败
          public Result onError(long duration, TimeUnit durationUnit) {
              Snapshot snapshot;
              // 超过慢调用时间阈值 及算做 慢失败 否则失败
              if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
                  snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);
              } else {
                  snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);
              }
              // 校验失败率或者慢调用率是否超过了阈值
              return checkIfThresholdsExceeded(snapshot);
          }
      
          // 校验失败率或者慢调用率是否超过了阈值
          private Result checkIfThresholdsExceeded(Snapshot snapshot) {
              float failureRateInPercentage = getFailureRate(snapshot);
              float slowCallsInPercentage = getSlowCallRate(snapshot);
              // -1 表示 总请求数低于最小请求阈值 默认始终不熔断
              if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {
                  return Result.BELOW_MINIMUM_CALLS_THRESHOLD;
              }
              // 失败和慢调用 阈值都满足
              if (failureRateInPercentage >= failureRateThreshold
                  && slowCallsInPercentage >= slowCallRateThreshold) {
                  return Result.ABOVE_THRESHOLDS;
              }
              if (failureRateInPercentage >= failureRateThreshold) {
                  return Result.FAILURE_RATE_ABOVE_THRESHOLDS;
              }
      
              if (slowCallsInPercentage >= slowCallRateThreshold) {
                  return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;
              }
              return Result.BELOW_THRESHOLDS;
          }
      
          private float getSlowCallRate(Snapshot snapshot) {
              int bufferedCalls = snapshot.getTotalNumberOfCalls();
              // 未达到最小请求数 返回-1
              if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {
                  return -1.0f;
              }
              return snapshot.getSlowCallRate();
          }
      
          private float getFailureRate(Snapshot snapshot) {
              int bufferedCalls = snapshot.getTotalNumberOfCalls();
              // 未达到最小请求数 返回-1
              if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {
                  return -1.0f;
              }
              return snapshot.getFailureRate();
          }
      }
      

      可以看到指标的 统计都是基于io.github.resilience4j.core.metrics.Metrics类,他有两种实现,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口

      public interface Metrics {
          /**
           * 负责记录一次请求的信息
           * @param duration     the duration of the call 执行时间
           * @param durationUnit the time unit of the duration 时间单位
           * @param outcome      the outcome of the call 执行结果
           */
          Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome);
      
          /**
           * 获取当前度量指标快照信息
           */
          Snapshot getSnapshot();
          enum Outcome {
              SUCCESS, ERROR, SLOW_SUCCESS, SLOW_ERROR
          }
      }
      
      滑动窗口算法

      上面提到过,CircuitBreaker 总共有两种滑动窗口算法,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口,接下来将会详细介绍下这两种算法,不过在此之前,容我先介绍几个类

      • AbstractAggregation 一个抽象聚合类,定义了失败数、总执行时间、慢调用数、慢失败数、总调用数
      • Total Aggregation 在AbstractAggregation 的基础上 新定义了一个方法,可以removeBucket
      • Measurement 在AbstractAggregation基础上,新定义了一个reset方法 负责重置数据
      • PartialAggregation 在AbstractAggregation基础上,新定义了一个变量 epochSecond 负责记录当前的秒级别数,reset 方法依旧是重置所有数据
      基于计数的滑动窗口实现:io.github.resilience4j.core.metrics.FixedSizeSlidingWindowMetrics

      基于计数的滑动窗口由N个测量值的圆形数组实现。
      如果计数窗口大小为10,则圆形阵列始终具有10个测量值。
      滑动窗口以增量方式更新总聚合。当记录新的呼叫结果时,将更新总汇总。收回最旧的度量后,将从总聚合中减去该度量,然后重置存储桶。(逐项扣除)
      快照的获取时间为常数O(1),因为快照是预先聚合的,并且与窗口大小无关。
      此实现的空间需求(内存消耗)应为O(n)

      public class FixedSizeSlidingWindowMetrics implements Metrics {
          // 滑动窗口大小
          private final int windowSize;
          // 指标聚合信息 包含 总执行时间,慢调用数,慢调用失败数,失败数,调用数
          private final TotalAggregation totalAggregation;
          // 指标数组
          private final Measurement[] measurements;
          // 当前滑动窗口位置
          int headIndex;
      
          public FixedSizeSlidingWindowMetrics(int windowSize) {
              this.windowSize = windowSize;
              this.measurements = new Measurement[this.windowSize];
              this.headIndex = 0;
              // 初始化指标数组
              for (int i = 0; i < this.windowSize; i++) {
                  measurements[i] = new Measurement();
              }
              this.totalAggregation = new TotalAggregation();
          }
          // 加锁 保证线程安全 record 就是调用结束后 回调的
          @Override
          public synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {
              totalAggregation.record(duration, durationUnit, outcome);
              moveWindowByOne().record(duration, durationUnit, outcome);
              return new SnapshotImpl(totalAggregation);
          }
          // 加锁 保证线程安全
          public synchronized Snapshot getSnapshot() {
              return new SnapshotImpl(totalAggregation);
          }
          private Measurement moveWindowByOne() {
              // 后移当前滑动窗口位置
              moveHeadIndexByOne();
              // 获取旧指标信息
              Measurement latestMeasurement = getLatestMeasurement();
              // 从总聚合中减去旧指标
              totalAggregation.removeBucket(latestMeasurement);
              // 重置当前指标
              latestMeasurement.reset();
              return latestMeasurement;
          }
          private Measurement getLatestMeasurement() {
              return measurements[headIndex];
          }
          // 后移当前滑动窗口位置
          void moveHeadIndexByOne() {
              this.headIndex = (headIndex + 1) % windowSize;
          }
      }
      
      基于时间的滑动窗口实现: io.github.resilience4j.core.metrics.SlidingTimeWindowMetrics

      基于时间的滑动窗口是由N个部分集合(存储桶)的圆形数组实现的。
      如果时间窗口大小为10秒,则圆形数组始终具有10个部分聚合(存储桶)。每个存储桶都会汇总在某个时期内发生的所有调用的结果。(部分聚集)。圆形数组的头存储区存储当前纪元的呼叫结果。其他部分聚合存储前几秒的呼叫结果。
      滑动窗口不会单独存储呼叫结果(元组),而是以增量方式更新部分聚合(存储桶)和总聚合。
      当记录新的呼叫结果时,总聚合将增量更新。当最旧的存储桶被收回时,该存储桶的部分总聚合将从总聚合中减去,然后重置该存储桶。(逐项扣除)
      快照的获取时间为常数O(1),因为快照是预先聚合的,并且与时间窗口的大小无关。
      此实现的空间需求(内存消耗)应接近常数O(n),因为调用结果(元组)不是单独存储的。仅创建N个部分聚合和1个总聚合。
      部分聚合由3个整数组成,以计算失败的呼叫数,慢速呼叫数和总呼叫数。一个长存储所有呼叫的总持续时间。

      CircuitBreakerEvent

      Resilience4j的事件机制采用的是观察者设计模式,核心类在io.github.resilience4j.core包下

      • EventConsumer 事件接口消费者(观察者)
      • EventPublisher事件接口发布者(被观察者)
      • EventProcessor 实现了EventPublisher接口,封装了 消费者EventConsumer注册接口和负责处理对应事件通知

      CircuitBreakerEvent 事件类型

      目前共有八种:

      • CircuitBreakerOnSuccessEvent 请求调用成功时发布的事件
      • CircuitBreakerOnErrorEvent 请求调用失败时发布的事件
      • CircuitBreakerOnResetEvent 断路器重置时发布的事件
      • CircuitBreakerOnFailureRateExceededEvent 当达到失败率时发布的事件
      • CircuitBreakerOnIgnoredErrorEvent 当忽略的调用异常发生时发布的事件
      • CircuitBreakerOnSlowCallRateExceededEvent 当达到慢调用率时发布的事件
      • CircuitBreakerOnCallNotPermittedEvent 当请求不被允许调用时发布的事件
      • CircuitBreakerOnStateTransitionEvent 断路器状态转换时发布的事件

      对应枚举可以参考CircuitBreakerEvent.Type

      enum Type {
          ERROR(false),
          IGNORED_ERROR(false),
          SUCCESS(false),
          NOT_PERMITTED(false),
          STATE_TRANSITION(true),
          RESET(true),
          FORCED_OPEN(false),
          DISABLED(false),
          FAILURE_RATE_EXCEEDED(false),
          SLOW_CALL_RATE_EXCEEDED(false);
          public final boolean forcePublish;
          Type(boolean forcePublish) {
              this.forcePublish = forcePublish;
          }
      }
      

      image-20220914173149270

      EventPublisher 中定义了八种注册事件处理方法

      interface EventPublisher extends
          io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {
          EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);
          EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);
          EventPublisher onStateTransition(
              EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);
          EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);
          EventPublisher onIgnoredError(
              EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);
          EventPublisher onCallNotPermitted(
              EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
          EventPublisher onFailureRateExceeded(
              EventConsumer<CircuitBreakerOnFailureRateExceededEvent> eventConsumer);
          EventPublisher onSlowCallRateExceeded(
              EventConsumer<CircuitBreakerOnSlowCallRateExceededEvent> eventConsumer);
      }
      

      CircuitBreaker 断路器

      最后 我们再回到我们的主角 CircuitBreaker来瞅瞅。

      image-20220915161436952

      注:该图省略了很多方法,具体使用什么请具体分析,这里只列举了一些常用的,用于分析;State、Metrics、StateTransition、EventPublisher 在前面都有提到。具体往前面翻翻吧

      public interface CircuitBreaker {
          static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker,
              CheckedFunction0<T> supplier) {
              return () -> {
                  // 尝试获取执行权限 如果没权限会抛出 CallNotPermittedException 异常 (此时处于熔断状态)
                  circuitBreaker.acquirePermission();
                  final long start = circuitBreaker.getCurrentTimestamp();
                  try {
                      // 执行
                      T result = supplier.apply();
                      long duration = circuitBreaker.getCurrentTimestamp() - start;
                      // 执行成功后 则记录状态。内部会根据config配置的 recordResultPredicate 检测 result 是否是需要统计失败率
                      circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
                      return result;
                  } catch (Exception exception) {
                      // Do not handle java.lang.Error
                      long duration = circuitBreaker.getCurrentTimestamp() - start;
                      // 异常状态下,记录失败状态,计算失败率
                      circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);
                      throw exception;
                  }
              };
          }
          default <T> T executeCheckedSupplier(CheckedFunction0<T> checkedSupplier) throws Throwable {
              return decorateCheckedSupplier(this, checkedSupplier).apply();
          }
          // 这里负责创建一个CircuitBreaker 实例
      	static CircuitBreaker of(String name, CircuitBreakerConfig circuitBreakerConfig) {
              // 熟悉吗 
              return new CircuitBreakerStateMachine(name, circuitBreakerConfig);
          }
          // 接下来是几个权限接口
          boolean tryAcquirePermission();
          // 和tryAcquirePermission区别不同在于 没权限 该方法会抛出 CallNotPermittedException 异常
          void acquirePermission();
          // 释放权限
          void releasePermission();
          // decorateCheckedSupplier 方法中会调这三个方法哦
          void onSuccess(long duration, TimeUnit durationUnit);
          void onError(long duration, TimeUnit durationUnit, Throwable throwable);
          void onResult(long duration, TimeUnit durationUnit, Object result);
          // 该方法负责将状态重置为CLOSED状态
          void reset();
      }
      

      接下来分析下 CircuitBreakerStateMachine 中的几个方法

      public final class CircuitBreakerStateMachine implements CircuitBreaker {
      	// 保证状态的原子性,初始状态为关闭状态
          private final AtomicReference<CircuitBreakerState> stateReference;
          // xxxPermission方法就没必要分析了 都是调用 CircuitBreakerState的方法
          public boolean tryAcquirePermission() {
              boolean callPermitted = stateReference.get().tryAcquirePermission();
              if (!callPermitted) {
                  publishCallNotPermittedEvent();
              }
              return callPermitted;
          }
          public void releasePermission() {
              stateReference.get().releasePermission();
          }
          public void acquirePermission() {
              try {
                  stateReference.get().acquirePermission();
              } catch (Exception e) {
                  publishCallNotPermittedEvent();
                  throw e;
              }
          }
          
          public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
              if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
                  Throwable cause = throwable.getCause();
                  handleThrowable(duration, durationUnit, cause);
              } else {
                  handleThrowable(duration, durationUnit, throwable);
              }
          }
      
          private void handleThrowable(long duration, TimeUnit durationUnit, Throwable throwable) {
              if (circuitBreakerConfig.getIgnoreExceptionPredicate().test(throwable)) {
                  releasePermission();
                  publishCircuitIgnoredErrorEvent(name, duration, durationUnit, throwable);
              } else if (circuitBreakerConfig.getRecordExceptionPredicate().test(throwable)) {
                  publishCircuitErrorEvent(name, duration, durationUnit, throwable);
                  stateReference.get().onError(duration, durationUnit, throwable);
              } else {
                  publishSuccessEvent(duration, durationUnit);
                  stateReference.get().onSuccess(duration, durationUnit);
              }
          }
      
          public void onSuccess(long duration, TimeUnit durationUnit) {
              publishSuccessEvent(duration, durationUnit);
              stateReference.get().onSuccess(duration, durationUnit);
          }
      
          public void onResult(long duration, TimeUnit durationUnit, @Nullable Object result) {
              if (result != null && circuitBreakerConfig.getRecordResultPredicate().test(result)) {
                  ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);
                  publishCircuitErrorEvent(name, duration, durationUnit, failure);
                  stateReference.get().onError(duration, durationUnit, failure);
              } else {
                  onSuccess(duration, durationUnit);
              }
          }
      }
      
    • 相关阅读:
      【Maxent物种分布模型】气候变化对响尾蛇地理分布的影响
      java 基于springboot+vue的居民社区健康管理平台
      面试碰壁15次!作为一个已经27岁的测试工程师,未来在何方....
      2024有哪些免费的mac苹果电脑内存清理工具?
      小程序-走迷宫
      flutter生态一统甜夏 @Android @ios @windowse @macos @linux @Web
      Windows 10驱动开发入门(四):USB下的过滤器驱动
      Qt 在linux上检测内存泄漏,用valgrind的问题
      电商数据采集,用电商API帮你!(淘宝拼多多京东1688Lazada)
      synchronized原理
    • 原文地址:https://blog.csdn.net/jun8148/article/details/127105762