• 第四篇:Sentinel限流核心逻辑过程分析


    本篇主要是讲解Sentinel限流逻辑的核心流转是怎么样的,不会过多涉及到具体的算法,更多的是在讨论主线,具体每个规则的后面会说到

    简单使用的例子

    先看下官网提供的简单例子,使用的是流控规则,代码如下,可以看到入口其实就是Sphu.entry这个地方,也是最核心的逻辑

    1. public class SentinelHelloTest {
    2. public static void main(String[] args) throws Exception{
    3. // 配置规则.
    4. initFlowRules();
    5. while (true) {
    6. // 1.5.0 版本开始可以直接利用 try-with-resources 特性
    7. //SphU.entry进入资源,成功则执行内部逻辑
    8. try (Entry entry = SphU.entry("HelloWorld")) {
    9. Thread.sleep(10000 * 6);
    10. // 被保护的逻辑
    11. System.out.println("hello world");
    12. } catch (BlockException ex) {
    13. // 处理被流控的逻辑, 已经超出限制则抛出异常
    14. System.out.println("blocked!" + ex.getMessage());
    15. }
    16. }
    17. }
    18. /**
    19. * 初始化规则
    20. */
    21. private static void initFlowRules(){
    22. List rules = new ArrayList<>();
    23. //限流规则
    24. FlowRule rule = new FlowRule();
    25. //资源名字
    26. rule.setResource("HelloWorld");
    27. //限制类型,有qps和线程数,这里是线程数
    28. rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    29. // Set limit QPS to 20.
    30. //限制1秒最多只能进20个请求
    31. rule.setCount(20);
    32. rules.add(rule);
    33. //加载规则,在内存中进行维护的
    34. FlowRuleManager.loadRules(rules);
    35. }
    36. }

    Sphu.entry

    1. //入口
    2. //com.alibaba.csp.sentinel.SphU#entry(java.lang.String)
    3. //接着到
    4. //com.alibaba.csp.sentinel.CtSph#entry()
    5. @Override
    6. public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
    7. //对资源进行一个bao包装
    8. StringResourceWrapper resource = new StringResourceWrapper(name, type);
    9. //调用重载的方法
    10. return entry(resource, count, args);
    11. }
    12. //最终会到这里来
    13. //com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object...)
    14. private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    15. throws BlockException {
    16. //分支逻辑,判断Context是否为空
    17. Context context = ContextUtil.getContext();
    18. if (context instanceof NullContext) {
    19. // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
    20. // so here init the entry only. No rule checking will be done.
    21. return new CtEntry(resourceWrapper, null, context);
    22. }
    23. //如果为空使用默认的上下文
    24. if (context == null) {
    25. // Using default context.
    26. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    27. }
    28. //如果关闭则不会执行校验规则,默认是开启的
    29. // Global switch is close, no rule checking will do.
    30. if (!Constants.ON) {
    31. return new CtEntry(resourceWrapper, null, context);
    32. }
    33. //核心方法,构建处理链表,封装了一条链,然后使用双向链表的方式连接起来
    34. ProcessorSlot chain = lookProcessChain(resourceWrapper);
    35. /*
    36. * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
    37. * so no rule checking will be done.
    38. * 为空也是不需要校验
    39. */
    40. if (chain == null) {
    41. return new CtEntry(resourceWrapper, null, context);
    42. }
    43. Entry e = new CtEntry(resourceWrapper, chain, context);
    44. try {
    45. //责任链设计模式进入一个个slot调用
    46. chain.entry(context, resourceWrapper, null, count, prioritized, args);
    47. } catch (BlockException e1) {
    48. //责任链设计模式退出一个个slot调用
    49. e.exit(count, args);
    50. throw e1;
    51. } catch (Throwable e1) {
    52. // This should not happen, unless there are errors existing in Sentinel internal.
    53. RecordLog.info("Sentinel unexpected exception", e1);
    54. }
    55. return e;
    56. }
    57. //看到上面核心的方法其实就是一个solt链的构建与solt链条的调用,下面再分成几个部分来说说这个代码走向
    58. lookProcessChain

      构建链条的方法的方法,里面涉及到链条的构建与其对应的扩展

      1. //com.alibaba.csp.sentinel.CtSph#lookProcessChain
      2. ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) {
      3. //从chainMap中获取是否已经有了,缓存功能
      4. ProcessorSlotChain chain = chainMap.get(resourceWrapper);
      5. //如果不存在则使用双重检锁机制进行初始化
      6. if (chain == null) {
      7. synchronized (LOCK) {
      8. chain = chainMap.get(resourceWrapper);
      9. if (chain == null) {
      10. // Entry size limit.
      11. //链表的长度不能超过最大值,否则不进行处理,MAX_SLOT_CHAIN_SIZE为6000
      12. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
      13. return null;
      14. }
      15. //核心方法,进行构建链条
      16. chain = SlotChainProvider.newSlotChain();
      17. //把返回的chain放到缓存中去
      18. Map newMap = new HashMap(
      19. chainMap.size() + 1);
      20. newMap.putAll(chainMap);
      21. newMap.put(resourceWrapper, chain);
      22. //维护下最新的map,,这里暂时不太懂这层中转,,,
      23. chainMap = newMap;
      24. }
      25. }
      26. }
      27. //存在直接返回
      28. return chain;
      29. }
      30. //接着再看下具体的构建方法
      31. //com.alibaba.csp.sentinel.slotchain.SlotChainProvider#newSlotChain
      32. public static ProcessorSlotChain newSlotChain() {
      33. //如果不为空直接返回,默认是为空的
      34. if (slotChainBuilder != null) {
      35. return slotChainBuilder.build();
      36. }
      37. // Resolve the slot chain builder SPI.
      38. // 通过spi机制获取第一个或者获取默认的,这里是一个扩展,可以根据spi机制进行自定义,spi机制前面说了,这里就不再进去看了
      39. slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
      40. if (slotChainBuilder == null) {
      41. // Should not go through here.
      42. //如果还是为空,则使用默认的构建器
      43. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
      44. slotChainBuilder = new DefaultSlotChainBuilder();
      45. } else {
      46. RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
      47. slotChainBuilder.getClass().getCanonicalName());
      48. }
      49. //执行构建逻辑
      50. return slotChainBuilder.build();
      51. }
      52. //逻辑还是比较简单的,一目了然,接着再看下最终的构建逻辑
      53. //com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build
      54. @Override
      55. public ProcessorSlotChain build() {
      56. //创建一个ProcessorSlotChain对象,就是一个类似于双向链表的数据结构
      57. ProcessorSlotChain chain = new DefaultProcessorSlotChain();
      58. //再次使用spi机制提供扩展,然后获取排序好的链条,比如以下两个,哪个order小哪个就在前面,这里ORDER_AUTHORITY_SLOT比ORDER_DEGRADE_SLOT要小
      59. //@Spi(order = Constants.ORDER_AUTHORITY_SLOT) public class AuthoritySlot
      60. //@Spi(order = Constants.ORDER_DEGRADE_SLOT) public class DegradeSlot
      61. List sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
      62. //循环组装为一个调用链,这里采用的是责任链设计模式
      63. for (ProcessorSlot slot : sortedSlotList) {
      64. //不是AbstractLinkedProcessorSlot类型的不会被加到链条中
      65. if (!(slot instanceof AbstractLinkedProcessorSlot)) {
      66. RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
      67. continue;
      68. }
      69. //转换并添加到末尾中
      70. chain.addLast((AbstractLinkedProcessorSlot) slot);
      71. }
      72. return chain;
      73. }
      74. chain.entry

        责任链设计模式调用入口,看这个只要了解这个设计模式基本就可以知道了,所以下面要去看一下ProcessorSlot的结构和其他几个slot基本的一个情况

        ProcessorSlot

        1. public interface ProcessorSlot {
        2. /**
        3. * Entrance of this slot.
        4. * 进入该slot
        5. */
        6. void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
        7. Object... args) throws Throwable;
        8. /**
        9. * Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.
        10. *
        11. * 循环进入slot
        12. */
        13. void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
        14. Object... args) throws Throwable;
        15. /**
        16. * Exit of this slot.
        17. * 退出该slot
        18. */
        19. void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
        20. /**
        21. * Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.
        22. * 循环退出slot
        23. */
        24. void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
        25. }

        AbstractLinkedProcessorSlot

        抽象的实现,后面其他的校验规则都会继承这个类,所以这个抽象类还是很重要的,来看一下他的大概结构有哪些内容

        1. public abstract class AbstractLinkedProcessorSlot implements ProcessorSlot {
        2. //下个slot
        3. private AbstractLinkedProcessorSlot next = null;
        4. @Override
        5. public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        6. throws Throwable {
        7. //下一个不为空,则进行调用
        8. if (next != null) {
        9. next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        10. }
        11. }
        12. @SuppressWarnings("unchecked")
        13. void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        14. throws Throwable {
        15. T t = (T)o;
        16. entry(context, resourceWrapper, t, count, prioritized, args);
        17. }
        18. @Override
        19. public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        20. //下一个不为空,则进行调用
        21. if (next != null) {
        22. next.exit(context, resourceWrapper, count, args);
        23. }
        24. }
        25. //对下一个的基本操作
        26. public AbstractLinkedProcessorSlot getNext() {
        27. return next;
        28. }
        29. public void setNext(AbstractLinkedProcessorSlot next) {
        30. this.next = next;
        31. }
        32. }

        DefaultProcessorSlotChain 

        默认的实现类,这是调用的源头,也是比较重要的,用类似于双向链表的思路构建了一条链,可以看看下面的逻辑

        1. public class DefaultProcessorSlotChain extends ProcessorSlotChain {
        2. //第一个slot
        3. AbstractLinkedProcessorSlot first = new AbstractLinkedProcessorSlot() {
        4. @Override
        5. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        6. throws Throwable {
        7. super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        8. }
        9. @Override
        10. public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        11. super.fireExit(context, resourceWrapper, count, args);
        12. }
        13. };
        14. //最后一个slot,默认等于第一个
        15. AbstractLinkedProcessorSlot end = first;
        16. //在头节点添加
        17. @Override
        18. public void addFirst(AbstractLinkedProcessorSlot protocolProcessor) {
        19. protocolProcessor.setNext(first.getNext());
        20. first.setNext(protocolProcessor);
        21. if (end == first) {
        22. end = protocolProcessor;
        23. }
        24. }
        25. //在尾节点添加
        26. @Override
        27. public void addLast(AbstractLinkedProcessorSlot protocolProcessor) {
        28. end.setNext(protocolProcessor);
        29. end = protocolProcessor;
        30. }
        31. /**
        32. * Same as {@link #addLast(AbstractLinkedProcessorSlot)}.
        33. *
        34. * @param next processor to be added.
        35. */
        36. @Override
        37. public void setNext(AbstractLinkedProcessorSlot next) {
        38. addLast(next);
        39. }
        40. @Override
        41. public AbstractLinkedProcessorSlot getNext() {
        42. return first.getNext();
        43. }
        44. /**
        45. *
        46. * 调用第一个节点的进入
        47. */
        48. @Override
        49. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        50. throws Throwable {
        51. first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
        52. }
        53. /**
        54. *
        55. * 调用第一个节点的退出
        56. */
        57. @Override
        58. public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        59. first.exit(context, resourceWrapper, count, args);
        60. }
        61. }
        62. 其他的就是一些细项规则了,比如

          FlowSlot:限流slot

          LogSlot:日志slot

          等等,具体的slot在com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件中

          # Sentinel default ProcessorSlots
          com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
          com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
          com.alibaba.csp.sentinel.slots.logger.LogSlot
          com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
          com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
          com.alibaba.csp.sentinel.slots.system.SystemSlot
          com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
          com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
          

          至于前后顺序在每个slot上面会有@Spi(order = Constants.ORDER_STATISTIC_SLOT)进行指定,值越小就排的越前

          接下来就是后面的调用处理了,如下

          1. Entry e = new CtEntry(resourceWrapper, chain, context);
          2. try {
          3. //责任链设计模式进入一个个slot调用
          4. chain.entry(context, resourceWrapper, null, count, prioritized, args);
          5. } catch (BlockException e1) {
          6. //责任链设计模式退出一个个slot调用
          7. e.exit(count, args);
          8. throw e1;
          9. } catch (Throwable e1) {
          10. // This should not happen, unless there are errors existing in Sentinel internal.
          11. RecordLog.info("Sentinel unexpected exception", e1);
          12. }
          13. return e;

          返回的chain就是DefaultProcessorSlotChain,一调用它就会一个个的slot进行调用,从而实现拦截的功能,同理,调用e.exit就会一个个solt去调用对应的方法,直到结束

          本篇文章只是记录整体的逻辑,这里使用的是责任链的设计模式,这个模式在很多地方都会用到,大家可以认真学习下这种思路,这篇文章就只讲到这里,后面的文章会再说下具体规则的实现与其校验

        63. 相关阅读:
          Flink Forward Asia 2022 主论坛概览
          Linux IIC 驱动实验
          FAQ是什么?该如何编辑FAQ?
          设计模式——访问者模式(Visitor Pattern)+ Spring相关源码
          MybatisPlus框架教程:入门、条件构造器、接口操作、代码生成器
          vue3 - 开发和生产环境通过Mock模拟真实接口请求
          分析一下:运营商三网大数据能获取精准客户的原因
          HTML基础
          《数据结构与算法》-双链表的增删查改,链表与顺序表的区别
          CORS(跨域资源共享)
        64. 原文地址:https://blog.csdn.net/zxc_user/article/details/126055426