• 手撕漏桶&令牌桶限流算法(Java版)


    漏桶算法

    漏桶算法解决了时间窗口类算法的痛点,可以使流量更加的平滑;

    漏桶(Leaky Bucket)算法可以理解为注水漏水的过程,往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。

    • 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的;
    • 桶的容量一般用来表示系统所能处理的请求数;
      • 如果桶的容量满了,也就达到限流的阀值,会丢弃水滴(即:拒绝请求);
    • 流出的水滴,是恒定速率的,用来表示服务按照固定的速率处理请求。

    消息中间件MQ采用的正是漏桶的思想,水滴的流入和留出可以看做是生产者消费者模式;

    • 请求是一个生产者,每一个请求都如一滴水,请求到来后放到一个队列(漏桶)中;
    • 桶底有一个孔,不断的漏出水滴,就像消费者不断的消费队列中的内容,并且消费的速率(漏出的速度)等于限流阈值。
    package com.saint.algorithm.limiting;
    
    import java.time.LocalDateTime;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 漏桶算法
     * 

    * 可以理解为注水漏水的过程,往漏桶中以任意速率流入水,以固定的速率流出水。 * 当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。 * 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的; * 桶的容量一般用来表示系统所能处理的请求数;如果桶的容量满了,也就达到限流的阀值,会丢弃水滴(即:拒绝请求); * 流出的水滴,是恒定速率的,用来表示服务按照固定的速率处理请求。 * * @author Saint */ public class LeakyBucketRateLimiter { private int qps = 2; // 漏桶 private LinkedBlockingQueue<Character> waterBucket; // 表示桶是否被初始化 private volatile boolean initialized = false; // 关联一个漏桶(一般情况为自身) private static volatile LeakyBucketRateLimiter leakyBucketRateLimiter; public static LeakyBucketRateLimiter getLeakyBucket() { return leakyBucketRateLimiter; } /** * 创建一个漏桶 * * @param capacity 漏桶容量 * @param qps 漏桶支持的QPS * @return */ public static LeakyBucketRateLimiter create(int capacity, int qps) { leakyBucketRateLimiter = new LeakyBucketRateLimiter(capacity, qps); return leakyBucketRateLimiter; } private LeakyBucketRateLimiter(int capacity, int qps) { // 漏桶只能被初始化一次 if (!initialized) { this.qps = qps; waterBucket = new LinkedBlockingQueue<>(capacity); // 初始化消费者 initConsumer(); } } /** * 漏桶中的水以固定速率流出 */ private void initConsumer() { new Thread(() -> { while (true) { try { TimeUnit.MILLISECONDS.sleep(1000 / qps); } catch (InterruptedException e) { // log for exception System.out.println("Exception occur! " + e); } // 以固定速率消费 waterBucket.poll(); } }).start(); } /** * 是否将请求加入到漏桶,能存入则代表漏桶没满,允许请求通过,返回true;否则返回false * * @return */ public boolean tryAcquire() { return waterBucket.offer('S'); } /** * 漏桶容量2,每秒流出2滴水; * 最初漏桶中没有水,所以第一秒立刻打进去两滴水;又由于漏桶每秒流出2滴水,所以在程序开始跑时,第一秒必回流出一滴水(极限情况2滴) * 所以第一秒会进入3滴水,第一秒之后每秒稳定流入2滴水,即QPS为2; * * @param args */ public static void main(String[] args) { LeakyBucketRateLimiter leakyBucket = LeakyBucketRateLimiter.create(2, 2); for (int i = 0; i < 20; i++) { LocalDateTime now = LocalDateTime.now(); if (leakyBucket.tryAcquire()) { System.out.println(now + " pass the rate limiting"); } else { System.out.println(now + " was limited"); } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { // 日志记录 } } System.out.println("------------"); // 再次获取漏桶 LeakyBucketRateLimiter leakyBucket2 = LeakyBucketRateLimiter.getLeakyBucket(); // 验证漏桶只会有一个 System.out.println("leakyBucket only one ? " + (leakyBucket == leakyBucket2)); for (int i = 0; i < 10; i++) { LocalDateTime now = LocalDateTime.now(); if (leakyBucket2.tryAcquire()) { System.out.println(now + " pass the rate limiting"); } else { System.out.println(now + " was limited"); } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { // 日志记录 } } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133

    代码解释:

    • 漏桶容量2,每秒流出2滴水;
    • 最初漏桶中没有水,所以第一秒立刻打进去两滴水;又由于漏桶每秒流出2滴水,所以在程序开始跑时,第一秒必回流出一滴水(极限情况2滴)
    • 所以第一秒会进入3滴水,第一秒之后每秒稳定流入2滴水,即QPS为2;

    令牌桶算法

    令牌桶(Token Bucket)算法是对漏桶算法的一种改进,不仅能够平滑限流,还允许一定程度的流量突发;它是网络流量整形(Traffic Shaping)速率限制(Rate Limiting)中最常使用的一种算法。

    令牌桶的实现思路也类似于生产者和消费之间的关系:

    • 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 500,每 2ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。

    • 请求的执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;

    • 如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝请求,以此达到限流目的。

    令牌桶的实现特点:

    1. 1s / 限流阈值(QPS) = 令牌添加时间间隔;
    2. 桶的容量可以大于限流的阈值(做一定的冗余),令牌数量达到桶容量时,不再添加;
    3. 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理;
    4. 令牌桶启动时桶中无令牌,启动后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这个问题。

    Google 的 Java 开发工具包 Guava 中的限流工具类 RateLimiter 就是令牌桶的一个实现

    package com.saint.algorithm.limiting;
    
    import java.time.LocalDateTime;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 令牌桶算法
     * 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 500,每 2ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
     * 请求的执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;
     * 如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝请求,以此达到限流目的。
     * 

    * 1s / 限流阈值(QPS) = 令牌添加时间间隔; * 桶的容量可以大于限流的阈值(做一定的冗余),令牌数量达到桶容量时,不再添加; * * @author Saint */ public class TokenBucketRateLimiter { private int qps = 2; // 漏桶 private volatile LinkedBlockingQueue<Character> tokenBucket = null; // 表示桶是否被初始化 private volatile boolean initialized = false; // 关联一个漏桶(一般情况为自身) private static volatile TokenBucketRateLimiter tokenBucketRateLimiter; public static TokenBucketRateLimiter getTokenBucket() { return tokenBucketRateLimiter; } /** * 创建一个漏桶 * * @param capacity 漏桶容量 * @param qps 漏桶支持的QPS * @return */ public static TokenBucketRateLimiter create(int capacity, int qps) { tokenBucketRateLimiter = new TokenBucketRateLimiter(capacity, qps); return tokenBucketRateLimiter; } private TokenBucketRateLimiter(int capacity, int qps) { // 漏桶只能被初始化一次 if (!initialized) { this.qps = qps; tokenBucket = new LinkedBlockingQueue<>(capacity); // 初始化生产者 initProducer(); } } /** * 令牌桶中的令牌以固定速率加入 */ private void initProducer() { // 令牌桶初始容量为qps for (int i = 0; i < qps; i++) { tokenBucket.offer('S'); } new Thread(() -> { while (true) { try { TimeUnit.MILLISECONDS.sleep(1000 / qps); } catch (InterruptedException e) { // log for exception System.out.println("Exception occur! " + e); } // 以固定速率生产令牌 tokenBucket.offer('S'); } }).start(); } /** * 获取令牌,能获取到,允许请求通过,返回true;否则返回false * * @return */ public boolean tryAcquire() { return tokenBucket.poll() == null ? false : true; } /** * 令牌桶容量为2,每秒加入2个令牌; * 最初漏桶中有两个令牌,所以第一秒立刻获取到两个令牌;又由于令牌桶每秒加入2个令牌,所以在程序开始跑时,第一秒必回加入一个令牌(极限情况2个) * 所以第一秒会获取到3个令牌,第一秒之后每秒稳定获取到两个令牌,即QPS为2; * * @param args */ public static void main(String[] args) { TokenBucketRateLimiter tokenBucket = TokenBucketRateLimiter.create(2, 2); for (int i = 0; i < 20; i++) { LocalDateTime now = LocalDateTime.now(); if (tokenBucket.tryAcquire()) { System.out.println(now + " pass the rate limiting"); } else { System.out.println(now + " was limited"); } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { // 日志记录 } } System.out.println("------------"); // 再次获取漏桶 TokenBucketRateLimiter tokenBucket2 = TokenBucketRateLimiter.getTokenBucket(); // 验证令牌桶只会有一个 System.out.println("tokenBucket only one ? " + (tokenBucket == tokenBucket2)); for (int i = 0; i < 10; i++) { LocalDateTime now = LocalDateTime.now(); if (tokenBucket2.tryAcquire()) { System.out.println(now + " pass the rate limiting"); } else { System.out.println(now + " was limited"); } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { // 日志记录 } } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137

    代码解释:

    • 令牌桶容量为2,每秒加入2个令牌;
    • 最初漏桶中有两个令牌,所以第一秒立刻获取到两个令牌;又由于令牌桶每秒加入2个令牌,所以在程序开始跑时,第一秒必会加入一个令牌(极限情况2个)
    • 所以第一秒会获取到3个令牌,第一秒之后每秒稳定获取到两个令牌,即QPS为2;
  • 相关阅读:
    使用python利用merge+sort函数对excel进行连接并排序
    Windows中安装tree命令
    MFC子类控件化
    哈希切割+布隆过滤器
    MySQL中find_in_set函数的使用
    Hudi extraMetadata 研究总结
    python办公之使用xlrd读取excel文件
    FPGA学习笔记(四)通过数码管学习顶层模块和例化的编写
    win10系统访问我的电脑&win10打开命令行
    七 R语言|箱须图、饼图的绘制
  • 原文地址:https://blog.csdn.net/Saintmm/article/details/126373065