• Dubbo中的负载均衡算法之平滑加权轮询算法源码解析


    平滑加权轮询

    轮询算法

    轮询算法很简单,就是每台服务器轮流提供服务,代码如下:

    private static final List<String> SERVERS;
    
    private static final AtomicInteger OFFSET = new AtomicInteger(0);
    
    static {
        SERVERS = Lists.newArrayList("A", "B", "C");
    }
    
    private static String doSelect() {
        if (OFFSET.get() > SERVERS.size() - 1) {
            OFFSET.set(0);
        }
        return SERVERS.get(OFFSET.getAndIncrement());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    加权轮询

    简单轮询算法和简单随机算法一样,面临的一个问题就是,有的机器性能好,有的机器性能差,如何能保证能者多劳呢?就需要给每个服务器加一个权重,让服务的调度机会能按照其权重的比例来,最简单的实现是复制法。假设有三台服务器servers = ["A", "B", "C"],其中每台服务器的权重为:weights = [6, 3, 1],则我们可以按照权重复制一个数组["A","A","A","A","A","A","B","B","B","C"],让其按照上述的代码被调度即可,但是这种算法有一个缺点是对内存的消耗较大。

    我们看一种更好的实现方法,我们这里还是建一个一维的坐标轴,标记0-9十个节点,某一次的请求,如果落在[0,5)的区间之内,则选择A,如果落在[6-8)的区间内则选择B,如果落在[9,10)的区间内,则选择C。那么如何让某一次的请求能成为对应区间的一个索引数字呢,之前随机算法我们用的是随机数生成的,这里轮询算法则不能再用随机数了,我们需要为每次请求设置一个编号,这个编号应该是递增的,是全局的,也应该是线程安全的,所以这里我们用AtomicInteger来记录。随着请求的次数越来越多,这个编号必然会超过10,最终到达100,1000,如何映射到0-9的区间呢,可以通过取余的方式来对这个值进行缩小,保证他在0-9之间。取余是一个常用的技巧,在hashmap等hash表的设计中经常用到。

    我们以上面的三台服务器权重为例,模拟建一个坐标轴:0-----6-----9-----10,模拟一下我们的算法

    1. 第一次调用,1 % 10 = 1,1在(0-6]的区间,则选择服务器A
    2. 第一次调用,2 % 10 = 2,2在(0-6]的区间,则选择服务器A
    3. 第六次调用,6 % 10 = 6,6在(0-6]的区间,则选择服务器A
    4. 第七次调用,7 % 10 = 7,7不在(0-6]的区间,在(6-9]的区间,则选择服务器B
    5. 第十次调用,10 % 10 = 0,0作为一个特殊的位置,选择服务器C
    private static final AtomicInteger NUM = new AtomicInteger(1);
    
    private static final List<String> SERVERS;
    
    private static final Map<String, Integer> SERVER_WEIGHT_MAP;
    
    static {
        SERVERS = Lists.newArrayList(A", "B", "C");
        SERVER_WEIGHT_MAP = new LinkedHashMap<>();
        SERVER_WEIGHT_MAP.put("A", 6);
        SERVER_WEIGHT_MAP.put("B", 3);
        SERVER_WEIGHT_MAP.put("C", 1);
    }
    
    private static String doSelectByWeight() {
        int length = SERVER_WEIGHT_MAP.keySet().size();
    
        boolean sameWeight = true;
        int totalWeight = 0;
    
        for (int i = 0; i < length; i++) {
            int weight = (int) SERVER_WEIGHT_MAP.values().toArray()[i];
            totalWeight += weight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
    
        if (!sameWeight) {
            int offset = NUM.getAndIncrement() % totalWeight;
            offset = offset == 0 ? totalWeight : offset;
            Set<Map.Entry<String, Integer>> entries = SERVER_WEIGHT_MAP.entrySet();
    
            for (Map.Entry<String, Integer> entry : entries) {
                Integer weight = entry.getValue();
                // 第七次调用,7 % 10  = 7,7不在(0-6]的区间,在(6-9]的区间,则选择服务器B,这种直观理解好理解,当时用代码实现有些复杂,可以用另一种思路来实现
                // ex: 计算的offset = 6,然后遍历服务器列表,首先得到服务器A,6是否小于A的权重,是则选择A,结束程序
                // 否的话,应该是比6大了,那么到底是选择B,还是选择C呢,假设offset = 7,那么我们让他减去A的权重,看得到的结果是否小于B的权重,是则选中
                // 否的话,在减去B的权重,看得到的结果是否小于C的权重,以此类推
                if (offset <= weight) {
                    return entry.getKey();
                }
                offset -= weight;
            }
        }
        return SERVERS.get(NUM.getAndIncrement() % length);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    平滑加权轮询

    上述算法虽然实现了加权轮询的效果,但是依然有一个缺点就是,如果某一个服务器权重很大,那么他就需要连续的处理请求,比如上面例子中,如果连续调用10次,则依次被选中的服务器是:AAAAAABBBC。这就导致前期服务器A的压力较大,而B和C又处于闲置状态,无法分担压力,我们理想的可能是保证好10次调用,A需要被调用6次,B需要被调用3次,C需要被调用1次就行,顺序其实没必要,而且调用的顺序乱一点,可能才是我们期望的结果,比如:ABAABACABA这样。这就需要用到另一种算法,平滑加权轮询算法

    平滑加权轮询算法的思路如下:

    1. 每一个server对应两个权重,weight和currentWeight,weight是固定的,currentWeight初始值为0,后续是动态调整的
    2. 新的请求访问到,调整每个server的currentWeight = currentWeight + weight,
    3. currentWeight最大的server被选中
    4. 调整currentWeight = currentWeight - 总权重(只调整最大权重)
    编号currentWeight = currentWeight + weightmax(currentWeight)max(currentWeight) - 总权重
    [0, 0, 0]
    1[6, 3, 1]6 -> A[-4, 3, 1]
    2[2, 6, 2]6 -> B[2, -4, 2]
    3[8, -1, 3]8 -> A[-2, -1, 3]
    4[4, 2, 4]4 -> A[-6, 2, 4]
    5[0, 5, 5]5 -> B[0, -5, 5]
    6[6, -2, 6]6 -> A[-4, 2, 6]
    7[2, 1, 7]7 -> C[2, 1, -3]
    8[8, 4, -2]8 -> A[-2, 4, 2]
    9[4, 7, -1]7 -> B[4, -3, -1]
    10[10, 0, 0]10-> A[0, 0, 0]

    代码实现如下:

    private static final Map<String, WeightedRoundRobin> WEIGHT_MAP;
    
        static {
            WEIGHT_MAP = new LinkedHashMap<>();
            WEIGHT_MAP.put("192.168.0.1", new WeightedRoundRobin("192.168.0.1", 6, 0));
            WEIGHT_MAP.put("192.168.0.2", new WeightedRoundRobin("192.168.0.2", 3, 0));
            WEIGHT_MAP.put("192.168.0.3", new WeightedRoundRobin("192.168.0.3", 1, 0));
        }
    
        @Data
        static class WeightedRoundRobin {
            private String ip;
            private int weight;
            private int current;
    
            public WeightedRoundRobin (String ip, int weight, int current) {
                this.ip = ip;
                this.weight = weight;
                this.current = current;
            }
        }
    
    
        private static String doSelectByWeightV2() {
            Integer totalWeight = WEIGHT_MAP.values().stream().map(WeightedRoundRobin::getWeight).reduce(0, Integer::sum);
    
            // 1. current_weight += weight
            WEIGHT_MAP.values().forEach(weight -> weight.setCurrent(weight.getCurrent() + weight.getWeight()));
    
            // 2. select max
            WeightedRoundRobin maxCurrentWeight = WEIGHT_MAP.values().stream().
                    max(Comparator.comparing(WeightedRoundRobin::getCurrent)).get();
    
            // 3. max(currentWeight) -= sum(weight)
            maxCurrentWeight.setCurrent(maxCurrentWeight.getCurrent() - totalWeight);
    
            // 返回maxCurrentWeight所对应的ip
            return maxCurrentWeight.getIp();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    dubbo中的代码实现如下:

    public class RoundRobinLoadBalance extends AbstractLoadBalance {
        public static final String NAME = "roundrobin";
    
        private static final int RECYCLE_PERIOD = 60000;
    
        protected static class WeightedRoundRobin {
            private int weight;
            private AtomicLong current = new AtomicLong(0);
            private long lastUpdate;
    
            public int getWeight() {
                return weight;
            }
    
            public void setWeight(int weight) {
                this.weight = weight;
                current.set(0);
            }
    
            public long increaseCurrent() {
                return current.addAndGet(weight);
            }
    
            public void sel(int total) {
                current.addAndGet(-1 * total);
            }
    
            public long getLastUpdate() {
                return lastUpdate;
            }
    
            public void setLastUpdate(long lastUpdate) {
                this.lastUpdate = lastUpdate;
            }
        }
    
        private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
    
        /**
         * get invoker addr list cached for specified invocation
         * 

    * for unit test only * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); int weight = getWeight(invoker, invocation); // 1. 遍历所有invoker,初始化权重 WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); if (weight != weightedRoundRobin.getWeight()) { //weight changed weightedRoundRobin.setWeight(weight); } // 2. 设置current_weight += weight long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); // 3. 完成一次遍历之后,找到max(currentWeight) if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null) { // 4. max(currentWeight) -= sum(weight) selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    ※注:平滑加权轮询算法不是dubbo首次创建并提出的,应该是nginx最初提出的,nginx实现参考:ngx_http_upstream_init_round_robin.c,关于平滑加权轮询的数学证明参考:nginx平滑的基于权重轮询算法分析 | tenfy’ blog

  • 相关阅读:
    【spring】初识spring基础
    【超详细demo】Spring Boot 多数据源配置
    【开山篇】Go(Golang)概述
    【将图片链接中的图片合并成PDF】
    【表面缺陷检测】钢轨表面缺陷检测数据集介绍(2类,含xml标签文件)
    【MySQL】 Java的JDBC编程
    恒运资本:沪指震荡涨0.28%,医药板块强势拉升,金融等板块上扬
    第1关:会话创建与关闭
    深度学习YOLOv5车辆颜色识别检测 - python opencv 计算机竞赛
    后端接口性能差,该从哪些方面进行优化?
  • 原文地址:https://blog.csdn.net/weixin_40149557/article/details/127838237