• Ribbon的随机算法,为什么能难倒这么多的微服务专家?


    昨天手写了一个注册中心,并想着顺便写一个负载均衡器,集成一下。

    手写的注册中心对标Spring Cloud Alibaba Nacos,分为客户端服务端

    手写的负载均衡器,分为两个版本:Web版本和WebFlux版本

    • Web:Spring Cloud Netflix Ribbon
    • WebFlux:Spring Cloud LoadBalancer

    想要源码的小伙伴,文末有链接。【仅供学习和参考

    看到Ribbon的随机算法,让我产生了一些问题,不问不知道,一问吓一跳。问了很多人,上到各大厂相关领域的专家,下到一些所谓的高端人员,再到国内外网站,没有人注意到这个问题。

    今天晚上得以有时间,我通宵把Ribbon这个随机算法讲的透彻一些,此处涉及到的知识点颇多,写了大概有几万字。前人栽树,后人乘凉。

    我在上几篇文章:

    1. 《Ribbon源码深入解析》
    2. 《RestTemplate源码深入解析》
    3. 《Nacos服务注册源码深入解析》
    4. 《Nacos心跳机制源码深入解析》
    5. 《Nacos服务续约源码深入解析》
    6. 《Nacos超时剔除服务机制源码深入解析》

    我们这里的环境就用RestTemplate+Ribbon+Nacos,所以说上面这几篇文章对下文的理解很有必要。

    先来看看Ribbon的随机算法吧。

    在这里插入图片描述
    对于整体的设计思想,我们不难理解,根据服务名获取所有的服务列表,根据负载均衡器规则再选出一个服务。

    我们先来看这个随机数的实现,在很久以前,我写过几篇Random随机数的源码深入解析,这里采用的ThreadLocalRandom无非是每个线程拥有不同的种子seed,这样就实现了线程安全的随机数,其底层无非也是CAS,具体可看笔者这几篇文章:

    1. 《CAS自旋锁,深入C++源码解析》
    2. 《ThreadLocalRandom的作用及其使用》
    3. 《随机数生成的四种方法》
    4. 《JVMRandom源码追溯》
    5. 《SecureRandom源码解析》
    6. 《浅谈ThreadLocalInsecureRandom》
    7. 《Random类的那些事》

    在此之前,我们一定要注意一下这类的官方介绍:

    A loadbalacing strategy that randomly distributes traffic amongst existing servers.
    翻译:一种随机分布在现有服务器之间的负载均衡策略。

    记住这句话,这是深入RandomRule的重要思想。

    接下里,我们看一下Random的核心实现方法RandomRule#choose,当前方法的官方介绍是一定要看的:

    Randomly choose from all living servers
    翻译:从所有活着的服务器中随机选择

    看完类和方法的介绍,其实就能知道整体的设计了,为了提高我们的认知,我们来看看设计的细节。【这细节是真的细啊,贯穿了整个Ribbon生命周期和Nacos服务发现的生命周期】

    public Server choose(ILoadBalancer lb, Object key) {
    		// 如果没有负载器,直接返回空
            if (lb == null) {
                return null;
            }
            // 初始化Server
            Server server = null;
    		// 如果server为空就死循环,直到不为空
            while (server == null) {
            	// 如果线程中断,则直接返回null
                if (Thread.interrupted()) {
                    return null;
                }
                // 1:获取可达的服务列表
                List<Server> upList = lb.getReachableServers();
                // 2:获取所有的服务列表
                List<Server> allList = lb.getAllServers();
    			// 获取所有服务列表的个数
                int serverCount = allList.size();
                // 如果服务数为零,就直接返回空
                if (serverCount == 0) {
                    /*
                     * No servers. End regardless of pass, because subsequent passes
                     * only get more restrictive.
                     */
                    // 上面注解的意思就是,连服务都没有,还玩什么呀!
                    return null;
                }
    			// 所有服务列表的个数随机选一个数
                int index = chooseRandomInt(serverCount);
                // 用上面这个随机数作为可达服务列表的索引
                server = upList.get(index);
    			// 如果服务为空就把线程让出来
                if (server == null) {
                    /*
                     * The only time this should happen is if the server list were
                     * somehow trimmed. This is a transient condition. Retry after
                     * yielding.
                     */
                    Thread.yield();
                    continue;
                }
    			// 如果服务活着,就返回该服务,否则线程继续挂起
    			// 这里服务默认是false,后续讲解在哪里设置的true
                if (server.isAlive()) {
                    return (server);
                }
    
                // Shouldn't actually happen.. but must be transient or a bug.
                // 上面的意思就是:这种情况基本不能出现,就算出现了也是短暂的或是个bug
                server = null;
                Thread.yield();
            }
    		// 返回服务
            return server;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    我在这里详细的把每一行代码都做了注释,其实昨天我就三个问题:

    1. 这个可达的服务列表和所有服务列表的关系是什么?如何获取的?
    2. 为什么要用所有服务列表数量作为随机数的取值范围?而且还用该随机数作为可达服务列表的索引?不会越界?
    3. 服务的存活状态在哪里设置的?

    接下来就这三个问题展开论述

    我先给出结论:
    在BaseLoadBalancer#setServersList中会设置服务的存活状态,同时也会同步可达服务列表和所有服务列表,并且此任务是30s执行一次,同步Nacos的服务缓存数据。

    在这里插入图片描述
    那我们开始论证吧!!!

    首先Spring容器初始化的时候,会加载自动装配类,我们看一下Ribbon的自动装配类
    在这里插入图片描述
    我们重点看一下这个Bean
    在这里插入图片描述
    我们在之前解析Ribbon的核心源码时就说过,这离初始化SpringClientFactory时,会加载RibbonClientConfiguration配置类
    在这里插入图片描述
    这个配置类会初始化ZoneAwareLoadBalancer
    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    好了,这个start方法就是核心方法了,我们来看看它的实现

    在这里插入图片描述
    在这里插入图片描述

    来,给你看看它的这几个时间:

    1. 延迟时间:1s
    2. 刷新时间间隔:30s
      在这里插入图片描述
      我们来一起看一下它的这个任务:
    final Runnable wrapperRunnable = new Runnable() {
        @Override
        public void run() {
        	// isActive默认是false,经过CAS会变成true,这里不会进去该if
            if (!isActive.get()) {
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
                return;
            }
            try {
            	// 核心流程
                updateAction.doUpdate();
                lastUpdated = System.currentTimeMillis();
            } catch (Exception e) {
                logger.warn("Failed one update cycle", e);
            }
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    跟进去这个核心方法
    在这里插入图片描述
    在这里插入图片描述

    关键点来了,这里分为两大点:

    1. 获取服务列表
    2. 更新Ribbon内部维护的服务列表
      在这里插入图片描述
      获取服务列表,我们放在后文说,这里先来讲解一下是如何更新服务列表的。

    首先更新每个服务的存活状态【变为true】
    在这里插入图片描述
    在这里插入图片描述
    接下来,我们就来到了最核心的方法

    /**
     * Set the list of servers used as the server pool. This overrides existing
     * server list.
     * 设置用作服务器池的服务器列表,这将重写现有的服务器列表。
     */
    public void setServersList(List lsrv) {
    	// 获取锁
        Lock writeLock = allServerLock.writeLock();
        logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
        // 初始化新的服务列表
        ArrayList<Server> newServers = new ArrayList<Server>();
        // 加锁
        writeLock.lock();
        try {
        	// 初始化所有服务列表
            ArrayList<Server> allServers = new ArrayList<Server>();
            // 循环所有服务列表
            for (Object server : lsrv) {
            	// 如果为null则跳出循环
                if (server == null) {
                    continue;
                }
    			// 如果服务属于String,则初始化成Server
                if (server instanceof String) {
                    server = new Server((String) server);
                }
    			// 如果服务为Server,则增加进所有服务列表中
                if (server instanceof Server) {
                    logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                    allServers.add((Server) server);
                } else {
                	// 否则抛出异常
                    throw new IllegalArgumentException(
                            "Type String or Server expected, instead found:"
                                    + server.getClass());
                }
    
            }
            boolean listChanged = false;
            // 如果从注册中心查出来的服务列表不同
            if (!allServerList.equals(allServers)) {
                listChanged = true;
                if (changeListeners != null && changeListeners.size() > 0) {
                   List<Server> oldList = ImmutableList.copyOf(allServerList);
                   List<Server> newList = ImmutableList.copyOf(allServers);                   
                   for (ServerListChangeListener l: changeListeners) {
                       try {
                           l.serverListChanged(oldList, newList);
                       } catch (Exception e) {
                           logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                       }
                   }
                }
            }
            if (isEnablePrimingConnections()) {
                for (Server server : allServers) {
                    if (!allServerList.contains(server)) {
                        server.setReadyToServe(false);
                        newServers.add((Server) server);
                    }
                }
                if (primeConnections != null) {
                    primeConnections.primeConnectionsAsync(newServers, this);
                }
            }
            // This will reset readyToServe flag to true on all servers
            // regardless whether
            // previous priming connections are success or not
            allServerList = allServers;
            if (canSkipPing()) {
                for (Server s : allServerList) {
                    s.setAlive(true);
                }
                upServerList = allServerList;
            } else if (listChanged) {
                forceQuickPing();
            }
        } finally {
        	// 释放锁
            writeLock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    所有服务列表

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    
    • 1
    • 2
    • 3

    更新服务列表

    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    
    • 1
    • 2
    • 3

    最后看一下概览
    在这里插入图片描述
    还记得我们刚才说到的那个问题么?就是这个,讲了更新服务列表,还有如何获取服务列表呢?

    在这里插入图片描述
    我们来到顶级接口
    在这里插入图片描述
    因为用的Nacos,所以这里Nacos实现了该方法。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    整体的流程大概就是这个样子,其实并不复杂,更多的设计的哲学理念。

    其实,如果不触发Ping机制,可达服务列表和所有服务列表基本是同样的数据。

    触发Ping机制,就会抛出异常,返回null,交给上层去处理喽,这里还涉及很多定时器、Ping机制、锁优化、缓存优化等各种机制,设计得非常优雅。

    一些小可爱没深入研究过源码就说是bug和缺陷,我真是透透了~~~

    当然还有太多了,有时间在写吧。。。睡觉了~

  • 相关阅读:
    什么是设计模式?
    FL Studio官方20.9中文版无需汉化补丁,正确安装并设置切换
    Codeforces Round #804 (Div. 2)
    对于产业互联网的认识,直接关系着我们究竟会以怎样的心态来看待它
    小林coding笔记
    过滤器和监听器
    考研数据结构判断题整合
    测试大语言模型在嵌入式设备部署的可能性——模型TinyLlama-1.1B-Chat-v1.0
    Vue3的watch使用介绍及场景
    顺序查找和折半查找
  • 原文地址:https://blog.csdn.net/CSDN_SAVIOR/article/details/126404772