• Redis 实现限流策略


    除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求。

    比如在 UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定 时间内允许的次数,超过了次数那就是非法行为。对非法行为,业务必须规定适当的惩处策略。

    如何使用 Redis 来实现简单限流策略?

    接口的定义

    1. # 指定用户 user_id 的某个行为 action_key 在特定的时间内 period 只允许发生一定的次数
    2. max_count
    3. def is_action_allowed(user_id, action_key, period, max_count):
    4. return True
    5. # 调用这个接口 , 一分钟内只允许最多回复 5 个帖子
    6. can_reply = is_action_allowed("laoqian", "reply", 60, 5)
    7. if can_reply:
    8. do_reply()
    9. else:
    10. raise ActionThresholdOverflow()

    解决方案

    这个限流需求中存在一个滑动时间窗口,想想 zset 数据结构的 score 值,是不是可以通过 score 来圈出这个时间窗口来。

    Redis sorted sets | Redis

    而且我们只需要保留这个时间窗口,窗口之外的数据都可以砍掉。那这个 zset 的 value 填什么比较合适呢?它只需要保证唯一性即可,用 uuid 会比较浪费空间,那就改用毫秒时间戳吧。

     如图所示,用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保存下来。同一个用户同一种行为用一个 zset 记录。

    为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。

    通过统计滑动窗口内的行为数量与阈值 max_count 进行比较就可以得出当前的行为是否 允许。用代码表示如下:

    1. import org.apache.shiro.util.CollectionUtils;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.data.redis.core.RedisCallback;
    4. import org.springframework.data.redis.core.RedisTemplate;
    5. import org.springframework.stereotype.Component;
    6. import java.nio.charset.StandardCharsets;
    7. import java.util.List;
    8. @Component
    9. public class SimpleRateLimiter {
    10. @Resource
    11. private RedisTemplate redisTemplate;
    12. public boolean isActionAllowed(String userId, String actionKey, int period, long maxCount) {
    13. String key = String.format("hist:%s:%s", userId, actionKey);
    14. long nowMills = System.currentTimeMillis();
    15. List objects = redisTemplate.executePipelined((RedisCallback) connection -> {
    16. // 打开管道
    17. connection.openPipeline();
    18. byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
    19. // 添加命令
    20. connection.zAdd(keyBytes, nowMills
    21. , String.valueOf(nowMills).getBytes(StandardCharsets.UTF_8));
    22. // 清楚无用数据
    23. connection.zRemRangeByScore(keyBytes, 0, nowMills - period * 1000L);
    24. // 判断规定时间内请求数量
    25. connection.zCard(keyBytes);
    26. // 重新设置过期时间
    27. connection.expire(keyBytes, period + 1);
    28. // 关闭管道 不需要close 否则拿不到返回值
    29. // connection.closePipeline();
    30. // 这里一定要返回null,最终pipeline的执行结果,才会返回给最外层
    31. return null;
    32. });
    33. if (!CollectionUtils.isEmpty(objects)) {
    34. return (Long) objects.get(2) <= maxCount;
    35. }
    36. return true;
    37. }
    38. }
    39. 测试:

      1. import com.hcx.common.redisdemo.SimpleRateLimiter;
      2. import org.junit.jupiter.api.Test;
      3. import org.springframework.beans.factory.annotation.Autowired;
      4. import org.springframework.boot.test.context.SpringBootTest;
      5. @SpringBootTest
      6. class MallCommonApplicationTests {
      7. @Autowired
      8. private SimpleRateLimiter limiter;
      9. @Test
      10. void contextLoads() {
      11. for (int i = 0; i < 20; i++) {
      12. System.out.println("结果:" + limiter.isActionAllowed("laoqian", "reply", 60, 5));
      13. }
      14. }
      15. }
      16. /*
      17. 结果:true
      18. 结果:true
      19. 结果:true
      20. 结果:true
      21. 结果:true
      22. 结果:false
      23. 结果:false
      24. 结果:false
      25. 结果:false
      26. 结果:false
      27. 结果:false
      28. 结果:false
      29. 结果:false
      30. 结果:false
      31. 结果:false
      32. 结果:false
      33. 结果:false
      34. 结果:false
      35. 结果:false
      36. 结果:false
      37. */

      整体思路就是:每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。

       zset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了。 因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升 Redis 存取效率。

      但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。

      高级限流算法——漏斗限流

      漏斗限流是最常用的限流方法之一,顾名思义,这个算法的灵感源于漏斗(funnel)的结构。 

      漏洞的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,它就会变满,直至再也 装不进去。如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。 如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。

      如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

      所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着 系统允许该行为的最大频率。

      单机版漏斗算法实现_JAVA

      1. import java.util.HashMap;
      2. import java.util.Map;
      3. public class FunnelRateLimiter {
      4. private final Map funnels = new HashMap<>();
      5. public static void main(String[] args) {
      6. FunnelRateLimiter limiter = new FunnelRateLimiter();
      7. for (int i = 0; i < 20; i++) {
      8. System.out.println("次数" + (i + 1) + ": " + limiter.isActionAllowed("user", "get", 10, 1));
      9. }
      10. }
      11. /**
      12. * @param userId 用户ID
      13. * @param actionKey 行为key
      14. * @param capacity 漏斗容量
      15. * @param leakingRate 漏嘴流水速率 单位mill
      16. * @return 行为能否执行
      17. */
      18. public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
      19. String key = String.format("%s:%s", userId, actionKey);
      20. Funnel funnel = funnels.get(key);
      21. if (funnel == null) {
      22. funnel = new Funnel(capacity, leakingRate);
      23. funnels.put(key, funnel);
      24. }
      25. return funnel.watering(1); // 需要 1 个 quota
      26. }
      27. static class Funnel {
      28. /**
      29. * 漏斗容量
      30. */
      31. int capacity;
      32. /**
      33. * 漏嘴流水速率
      34. */
      35. float leakingRate;
      36. /**
      37. * 漏斗剩余空间
      38. */
      39. int leftQuota;
      40. /**
      41. * 上一次漏水时间
      42. */
      43. long leakingTs;
      44. public Funnel(int capacity, float leakingRate) {
      45. this.capacity = capacity;
      46. this.leakingRate = leakingRate;
      47. this.leftQuota = capacity;
      48. this.leakingTs = System.currentTimeMillis();
      49. }
      50. /**
      51. * 计算剩余空间
      52. */
      53. void makeSpace() {
      54. long nowTs = System.currentTimeMillis();
      55. long deltaTs = nowTs - leakingTs; // 距离上一次漏水过去了多久
      56. int deltaQuota = (int) (deltaTs * leakingRate); // 腾出多少空间
      57. if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
      58. this.leftQuota = capacity;
      59. this.leakingTs = nowTs;
      60. return;
      61. }
      62. if (deltaQuota < 1) { // 腾出空间太小,最小单位是 1
      63. return;
      64. }
      65. this.leftQuota += deltaQuota;
      66. this.leakingTs = nowTs;
      67. if (this.leftQuota > this.capacity) {
      68. this.leftQuota = this.capacity;
      69. }
      70. }
      71. /**
      72. * @param quota 待申请空间
      73. * @return 申请是否成功
      74. */
      75. boolean watering(int quota) {
      76. makeSpace();
      77. if (this.leftQuota >= quota) {
      78. this.leftQuota -= quota;
      79. return true;
      80. }
      81. return false;
      82. }
      83. }
      84. }
      85. /*
      86. 次数1: true
      87. 次数2: true
      88. 次数3: true
      89. 次数4: true
      90. 次数5: true
      91. 次数6: true
      92. 次数7: true
      93. 次数8: true
      94. 次数9: true
      95. 次数10: true
      96. 次数11: false
      97. 次数12: false
      98. 次数13: false
      99. 次数14: false
      100. 次数15: false
      101. 次数16: false
      102. 次数17: false
      103. 次数18: false
      104. 次数19: false
      105. 次数20: false
      106. */

      Funnel 对象的 make_space 方法是漏斗算法的核心,其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。

      能腾出多少空间取决于过去了多久以及流水的速率。Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量

      分布式的漏斗算法该如何实现?能不能使用 Redis 的基础数据结构来搞定?

      观察 Funnel 对象的几个字段,发现可以将 Funnel 对象的内容按字段存储到一 个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

      但是有个问题,无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。

      而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。

      • 如果重试的话,就会导致性能下降。
      • 如果放弃的话,就会影响用户体验。

      同时,代码的复杂度也跟着升高很多。

      Redis-Cell——限流 Redis 模块

      令牌桶算法_百度百科 (baidu.com)

      Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并 提供了原子的限流指令。

      该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这 个指令具体该如何使用。

      图中capacity设置的是总容量-1

      上面这个指令的意思是允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速 率),漏斗的初始容量为 16,也就是说一开始可以连续回复 16 个帖子,然后才开始受漏水速率的影响。我们看到这个指令中漏水速率变成了 2 个参数,替代了之前的单个浮点数。用 两个参数相除的结果来表达漏水速率相对单个浮点数要更加直观一些。 

      • > cl.throttle laoqian:reply 15 30 60
      • 1) (integer) 0 # 0 表示允许,1 表示拒绝
      • 2) (integer) 15 # 漏斗容量 capacity
      • 3) (integer) 14 # 漏斗剩余空间 left_quota
      • 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
      • 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

      在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周 到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想 阻塞线程,也可以异步定时任务来重试。

      1. package com.hcx.common.redisdemo;
      2. import org.apache.shiro.util.CollectionUtils;
      3. import org.springframework.data.redis.core.RedisTemplate;
      4. import org.springframework.data.redis.core.script.DefaultRedisScript;
      5. import org.springframework.stereotype.Component;
      6. import javax.annotation.Resource;
      7. import java.util.Collections;
      8. import java.util.List;
      9. @Component
      10. public class FunnelRateLimiter {
      11. /**
      12. * lua 脚本
      13. */
      14. public static final String LUA_SCRIPT = "return redis.call('cl.throttle',KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4])";
      15. @Resource
      16. private RedisTemplate redisTemplate;
      17. /**
      18. * @param key 键值
      19. * @param capacity 总漏斗容量-1
      20. * @param operations 漏水速率
      21. * @param seconds 漏水周期
      22. * @param quota 待申请空间
      23. * @return 申请结果
      24. */
      25. public boolean isActionAllowed(String key, int capacity, int operations, int seconds, int quota) {
      26. try {
      27. DefaultRedisScript script = new DefaultRedisScript<>(LUA_SCRIPT, List.class);
      28. List rs = redisTemplate.execute(script, Collections.singletonList(key), capacity, operations, seconds, quota);
      29. if (CollectionUtils.isEmpty(rs)) return false;
      30. System.out.println("漏斗容量:" + rs.get(1));
      31. System.out.println("剩余空间:" + rs.get(2));
      32. System.out.println("最少多长时间后再试:" + rs.get(3));
      33. System.out.println("多长时间后漏斗为空:" + rs.get(4));
      34. return rs.get(0) == 0;
      35. } catch (Exception e) {
      36. e.printStackTrace();
      37. return false;
      38. }
      39. }
      40. }
      41. /*
      42. 漏斗容量:16
      43. 剩余空间:15
      44. 最少多长时间后再试:-1
      45. 多长时间后漏斗为空:2
      46. 结果:true
      47. ……
      48. 最少多长时间后再试:-1
      49. 多长时间后漏斗为空:27
      50. 结果:true
      51. 漏斗容量:16
      52. 剩余空间:1
      53. 最少多长时间后再试:-1
      54. 多长时间后漏斗为空:29
      55. 结果:true
      56. 漏斗容量:16
      57. 剩余空间:0
      58. 最少多长时间后再试:-1
      59. 多长时间后漏斗为空:31
      60. 结果:true
      61. 漏斗容量:16
      62. 剩余空间:0
      63. 最少多长时间后再试:1
      64. 多长时间后漏斗为空:31
      65. 结果:false
      66. 漏斗容量:16
      67. 剩余空间:0
      68. 最少多长时间后再试:1
      69. 多长时间后漏斗为空:31
      70. 结果:false
      71. 漏斗容量:16
      72. 剩余空间:0
      73. 最少多长时间后再试:1
      74. 多长时间后漏斗为空:31
      75. 结果:false
      76. 漏斗容量:16
      77. 剩余空间:0
      78. 最少多长时间后再试:1
      79. 多长时间后漏斗为空:31
      80. 结果:false
      81. */

      在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周 到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想 阻塞线程,也可以异步定时任务来重试。

    40. 相关阅读:
      Selenium定向爬取海量精美图片及搜索引擎杂谈
      机器学习1:k 近邻算法
      Linux实用操作-----快捷键的使用(收藏系列)
      一篇文章,让你看懂 Spring Cloud 之 Eureka
      Blazor和Vue对比学习(基础1.4):事件和子传父
      图的存储之邻接矩阵
      正确使用update-alternatives管理jdk环境的方法
      外贸网站被谷歌收录的方法
      当鼠标移入一个元素时、其它隐藏部分的元素展示出来【如何将隐藏的操作展示、将只读信息变为可输入的内容】
      基于径向基函数RBF网络的手写数字分类(Matlab代码实现)
    41. 原文地址:https://blog.csdn.net/weixin_52383177/article/details/127870580