• Nacos配置中心交互模型


    对于Nacos大家应该都不太陌生,出身阿里名声在外,能做动态服务发现、配置管理,非常好用的一个工具。然而这样的技术用的人越多面试被问的概率也就越大,如果只停留在使用层面,那面试可能要吃大亏。

    比如我们今天要讨论的话题,Nacos在做配置中心的时候,配置数据的交互模式是服务端推过来还是客户端主动拉的?
    在这里插入图片描述

    1. 配置中心

    Nacos之前简单回顾下配置中心的由来。
    简单理解配置中心的作用就是对配置统一管理,修改配置后应用可以动态感知,而无需重启。
    因为在传统项目中,大多都采用静态配置的方式,也就是把配置信息都写在应用内的ymlproperties这类文件中,如果要想修改某个配置,通常要重启应用才可以生效。
    但有些场景下,比如我们想要在应用运行时,通过修改某个配置项,实时的控制某一个功能的开闭,频繁的重启应用肯定是不能接受的。
    尤其是在微服务架构下,我们的应用服务拆分的粒度很细,少则几十多则上百个服务,每个服务都会有一些自己特有或通用的配置。假如此时要改变通用配置,难道要我挨个改几百个服务配置?很显然这不可能。所以为了解决此类问题配置中心应运而生。
    在这里插入图片描述

    2. 推与拉模型

    客户端与配置中心的数据交互方式其实无非就两种,要么推push,要么拉pull

    2.1 推模型

    客户端与服务端建立TCP长连接,当服务端配置数据有变动,立刻通过建立的长连接将数据推送给客户端。

    2.1.1 优势

    长链接的优点是实时性,一旦数据变动,立即推送变更数据给客户端,而且对于客户端而言,这种方式更为简单,只建立连接接收数据,并不需要关心是否有数据变更这类逻辑的处理。

    2.1.2 弊端

    长连接可能会因为网络问题,导致不可用,也就是俗称的假死。连接状态正常,但实际上已无法通信,所以要有的心跳机制KeepAlive来保证连接的可用性,才可以保证配置数据的成功推送。

    2.2 拉模型

    客户端主动的向服务端发请求拉配置数据,常见的方式就是轮询,比如每3s向服务端请求一次配置数据。
    轮询的优点是实现比较简单。但弊端也显而易见,轮询无法保证数据的实时性,什么时候请求?间隔多长时间请求一次?都是不得不考虑的问题,而且轮询方式对服务端还会产生不小的压力。

    3. 轮询

    nacos采用的是客户端主动拉pull模型,应用长轮询(Long Polling)的方式来获取配置数据。
    额?以前只听过轮询,长轮询又是什么鬼?它和传统意义上的轮询(暂且叫短轮询吧,方便比较)有什么不同呢?

    3.1 短轮询

    不管服务端配置数据是否有变化,不停的发起请求获取配置,比如支付场景中前段JS轮询订单支付状态。
    这样的坏处显而易见,由于配置数据并不会频繁变更,若是一直发请求,势必会对服务端造成很大压力。还会造成推送数据的延迟,比如:每10s请求一次配置,如果在第11s时配置更新了,那么推送将会延迟9s,等待下一次请求。
    在这里插入图片描述
    为了解决短轮询的问题,有了长轮询方案。

    3.2 长轮询

    长轮询可不是什么新技术,它不过是由服务端控制响应客户端请求的返回时间,来减少客户端无效请求的一种优化手段,其实对于客户端来说与短轮询的使用并没有本质上的区别。
    客户端发起请求后,服务端不会立即返回请求结果,而是将请求挂起等待一段时间,如果此段时间内服务端数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间后响应请求,客户端重新发起长链接。
    在这里插入图片描述

    4. Nacos

    配置中心的几个核心概念:dataIdgroupnamespace,它们的层级关系如下图:
    在这里插入图片描述

    • dataId:是配置中心里最基础的单元,它是一种key-value结构,key通常是我们的配置文件名称,比如:application.yml、mybatis.xml,而value是整个文件下的内容。
    • group:dataId配置的分组管理,比如同在dev环境下开发,但同环境不同分支需要不同的配置数据,这时就可以用分组隔离,默认分组DEFAULT_GROUP
    • namespace:项目开发过程中肯定会有dev、test、pro等多个不同环境,namespace则是对不同环境进行隔离,默认所有配置都在public里。

    4.1 架构设计

    在这里插入图片描述
    客户端、控制台通过发送Http请求将配置数据注册到服务端,服务端持久化数据到Mysql。
    客户端拉取配置数据,并批量设置对dataId的监听发起长轮询请求,如服务端配置项变更立即响应请求,如无数据变更则将请求挂起一段时间,直到达到超时时间。为减少对服务端压力以及保证配置中心可用性,拉取到配置数据客户端会保存一份快照在本地文件中,优先读取

    4.2 客户端源码分析

    Nacos配置中心的客户端源码在nacos-client项目,其中NacosConfigService实现类是所有操作的核心入口。
    说之前先了解个客户端数据结构cacheMap,这里大家重点记住它,因为它几乎贯穿了Nacos客户端的所有操作,由于存在多线程场景为保证数据一致性,cacheMap采用了AtomicReference原子变量实现

    private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<>());
    
    • 1

    cacheMap是个Map结构,key为groupKey,是由dataId,group, tenant(租户)拼接的字符串;value为CacheData对象,每个dataId都会持有一个CacheData对象。

    4.2.1 获取配置

    Nacos获取配置数据的逻辑比较简单,先取本地快照文件中的配置,如果本地文件不存在或者内容为空,则再通过HTTP请求从远端拉取对应dataId配置数据,并保存到本地快照中,请求默认重试3次,超时时间3s。
    在这里插入图片描述
    获取配置有getConfig()getConfigAndSignListener()这两个接口,但getConfig()只是发送普通的HTTP请求,而getConfigAndSignListener()则多了发起长轮询和对dataId数据变更注册监听的操作addTenantListenersWithContent()

    @Override
    public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
    }
    
    @Override
    public String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
            throws NacosException {
        String content = getConfig(dataId, group, timeoutMs);
        worker.addTenantListenersWithContent(dataId, group, content, Arrays.asList(listener));
        return content;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.2.2 注册监听

    客户端注册监听,先从cacheMap中拿到dataId对应的CacheData对象。

    public void addTenantListenersWithContent(String dataId, String group, String content,
                                              List<? extends Listener> listeners) throws NacosException {
        group = blank2defaultGroup(group);
        String tenant = agent.getTenant();
        // 1、获取dataId对应的CacheData,如没有则向服务端发起长轮询请求获取配置
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        synchronized (cache) {
            // 2、注册对dataId的数据变更监听
            cache.setContent(content);
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setSyncWithServer(false);
            agent.notifyListenConfig();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    如没有则向服务端发起长轮询请求获取配置,默认的Timeout时间为30s,并把返回的配置数据回填至CacheData对象的content字段,同时用content生成MD5值;再通过addListener()注册监听器。

      public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
            CacheData cache = getCache(dataId, group, tenant);
            if (null != cache) {
                return cache;
            }
            String key = GroupKey.getKeyTenant(dataId, group, tenant);
            synchronized (cacheMap) {
                CacheData cacheFromMap = getCache(dataId, group, tenant);
                // multiple listeners on the same dataid+group and race condition,so
                // double check again
                // other listener thread beat me to set to cacheMap
                if (null != cacheFromMap) {
                    cache = cacheFromMap;
                    // reset so that server not hang this check
                    cache.setInitializing(true);
                } else {
                    cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
                    int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
                    cache.setTaskId(taskId);
                    // fix issue # 1317
                    if (enableRemoteSyncConfig) {
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L, false);
                        cache.setContent(ct[0]);
                    }
                }
                
                Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
                copy.put(key, cache);
                cacheMap.set(copy);
            }
            LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
            
            MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
            
            return cache;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    CacheData也是个出场频率非常高的一个类,我们看到除了dataId、group、tenant、content这些相关的基础属性,还有几个比较重要的属性如:listenersmd5(content真实配置数据计算出来的md5值),以及注册监听、数据比对、服务端数据变更通知操作都在这里。

        
        private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
        
        private volatile String md5;
        
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其中listeners是对dataId所注册的所有监听器集合,其中的ManagerListenerWrap对象除了持有Listener监听类,还有一个lastCallMd5字段,这个属性很关键,它是判断服务端数据是否更变的重要条件。
    在添加监听的同时会将CacheData对象当前最新的md5值赋值给ManagerListenerWrap对象的lastCallMd5属性。

    
        
        /**
         * check if all listeners md5 is equal with cache data.
         */
        public boolean checkListenersMd5Consistent() {
            for (ManagerListenerWrap wrap : listeners) {
                if (!md5.equals(wrap.lastCallMd5)) {
                    return false;
                }
            }
            return true;
        }
        
        /**
         * Add listener if CacheData already set new content, Listener should init lastCallMd5 by CacheData.md5
         *
         * @param listener listener
         */
        public void addListener(Listener listener) {
            if (null == listener) {
                throw new IllegalArgumentException("listener is null");
            }
            ManagerListenerWrap wrap =
                    (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
                            : new ManagerListenerWrap(listener, md5);
            
            if (listeners.addIfAbsent(wrap)) {
                LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                        listeners.size());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    4.2.3 变更通知

    客户端又是如何感知服务端数据已变更呢?

    我们还是从头看,NacosConfigService类的构造器中初始化了一个ClientWorker,而在ClientWorker类的构造器中又启动了一个线程池来轮询cacheMap

           
            @Override
            public void startInternal() throws NacosException {
                executor.schedule(new Runnable() {
                    @Override
                    public void run() {
                        while (true) {
                            try {
                                listenExecutebell.poll(5L, TimeUnit.SECONDS);
                                executeConfigListen();
                            } catch (Exception e) {
                                LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
                            }
                        }
                    }
                }, 0L, TimeUnit.MILLISECONDS);
                
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    而在executeConfigListen()方法中有这么一段逻辑,检查cacheMap中dataId的CacheData对象内,MD5字段与注册的监听listener内的lastCallMd5值,不相同表示配置数据变更则触发safeNotifyListener方法,发送数据变更通知。

        void checkListenerMd5() {
            for (ManagerListenerWrap wrap : listeners) {
                if (!md5.equals(wrap.lastCallMd5)) {
                    safeNotifyListener(dataId, group, content, type, md5, wrap);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    safeNotifyListener()方法单独起线程,向所有对dataId注册过监听的客户端推送变更后的数据内容。

    private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
                final String md5, final ManagerListenerWrap listenerWrap) {
            final Listener listener = listenerWrap.listener;
            if (listenerWrap.inNotifying) {
                LOGGER.warn(
                        "[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
                        name, dataId, group, md5, listener);
                return;
            }
            Runnable job = new Runnable() {
                @Override
                public void run() {
                    long start = System.currentTimeMillis();
                    ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                    ClassLoader appClassLoader = listener.getClass().getClassLoader();
                    try {
                        if (listener instanceof AbstractSharedListener) {
                            AbstractSharedListener adapter = (AbstractSharedListener) listener;
                            adapter.fillContext(dataId, group);
                            LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                        }
                        // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                        Thread.currentThread().setContextClassLoader(appClassLoader);
                        
                        ConfigResponse cr = new ConfigResponse();
                        cr.setDataId(dataId);
                        cr.setGroup(group);
                        cr.setContent(content);
                        configFilterChainManager.doFilter(null, cr);
                        String contentTmp = cr.getContent();
                        listenerWrap.inNotifying = true;
                        listener.receiveConfigInfo(contentTmp);
                        // compare lastContent and content
                        if (listener instanceof AbstractConfigChangeListener) {
                            Map data = ConfigChangeHandler.getInstance()
                                    .parseChangeData(listenerWrap.lastContent, content, type);
                            ConfigChangeEvent event = new ConfigChangeEvent(data);
                            ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                            listenerWrap.lastContent = content;
                        }
                        
                        listenerWrap.lastCallMd5 = md5;
                        LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
                                dataId, group, md5, listener, (System.currentTimeMillis() - start));
                    } catch (NacosException ex) {
                        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                                name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
                    } catch (Throwable t) {
                        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                                group, md5, listener, t.getCause());
                    } finally {
                        listenerWrap.inNotifying = false;
                        Thread.currentThread().setContextClassLoader(myClassLoader);
                    }
                }
            };
            
            final long startNotify = System.currentTimeMillis();
            try {
                if (null != listener.getExecutor()) {
                    listener.getExecutor().execute(job);
                } else {
                    try {
                        INTERNAL_NOTIFIER.submit(job);
                    } catch (RejectedExecutionException rejectedExecutionException) {
                        LOGGER.warn(
                                "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
                                name, dataId, group, md5, listener);
                        job.run();
                    } catch (Throwable throwable) {
                        LOGGER.error(
                                "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
                                name, dataId, group, md5, listener, throwable);
                        job.run();
                    }
                }
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                        group, md5, listener, t.getCause());
            }
            final long finishNotify = System.currentTimeMillis();
            LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                    name, (finishNotify - startNotify), dataId, group, md5, listener);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    客户端接收通知,直接实现receiveConfigInfo()方法接收回调数据,处理自身业务就可以了。

    configService.addListener(dataId, group, new Listener() {
        @Override
        public void receiveConfigInfo(String configInfo) {
            System.out.println("receive:" + configInfo);
        }
    
        @Override
        public Executor getExecutor() {
            return null;
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    为了理解更直观我用测试demo演示下,获取服务端配置并设置监听,每当服务端配置数据变化,客户端监听都会收到通知,一起看下效果。

    public static void main(String[] args) throws NacosException, InterruptedException {
        String serverAddr = "localhost";
        String dataId = "test";
        String group = "DEFAULT_GROUP";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        String content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
        configService.addListener(dataId, group, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("数据变更 receive:" + configInfo);
            }
            @Override
            public Executor getExecutor() {
                return null;
            }
        });
    
        boolean isPublishOk = configService.publishConfig(dataId, group, "我是新配置内容~");
        System.out.println(isPublishOk);
    
        Thread.sleep(3000);
        content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    结果和预想的一样,当向服务端publishConfig数据变化后,客户端可以立即感知,愣是用主动拉pull模式做出了服务端实时推送的效果。

    数据变更 receive:我是新配置内容~
    true
    我是新配置内容~
    
    • 1
    • 2
    • 3

    4.3 服务端源码分析

    Nacos配置中心的服务端源码主要在nacos-config项目的ConfigController类,服务端的逻辑要比客户端稍复杂一些,这里我们重点看下。

    4.3.1 处理长轮询

    服务端对外提供的监听接口地址/v1/cs/configs/listener,这个方法内容不多,顺着doPollingConfig往下看。

       /**
         * The client listens for configuration changes.
         */
        @PostMapping("/listener")
        @Secured(action = ActionTypes.READ, signType = SignType.CONFIG)
        public void listener(HttpServletRequest request, HttpServletResponse response)
                throws ServletException, IOException {
            
            request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
            String probeModify = request.getParameter("Listening-Configs");
            if (StringUtils.isBlank(probeModify)) {
                LOGGER.warn("invalid probeModify is blank");
                throw new IllegalArgumentException("invalid probeModify");
            }
            
            probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
            
            Map<String, String> clientMd5Map;
            try {
                clientMd5Map = MD5Util.getClientMd5Map(probeModify);
            } catch (Throwable e) {
                throw new IllegalArgumentException("invalid probeModify");
            }
            
            // do long-polling
            inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    服务端根据请求header中Long-Pulling-Timeout属性来区分请求是长轮询还是短轮询,这里咱们只关注长轮询部分,接着看LongPollingService(记住这个service很关键)类中的addLongPollingClient()方法是如何处理客户端的长轮询请求的。

      
        /**
         * long polling the config.
         */
        public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
            
            // Long polling.
            if (LongPollingService.isSupportLongPolling(request)) {
                longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
                return HttpServletResponse.SC_OK + "";
            }
            
            // Compatible with short polling logic.
            List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
            
            // Compatible with short polling result.
            String oldResult = MD5Util.compareMd5OldResult(changedGroups);
            String newResult = MD5Util.compareMd5ResultString(changedGroups);
            
            String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
            if (version == null) {
                version = "2.0.0";
            }
            int versionNum = Protocol.getVersionNumber(version);
            
            // Before 2.0.4 version, return value is put into header.
            if (versionNum < START_LONG_POLLING_VERSION_NUM) {
                response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
                response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
            } else {
                request.setAttribute("content", newResult);
            }
            
            // Disable cache.
            response.setHeader("Pragma", "no-cache");
            response.setDateHeader("Expires", 0);
            response.setHeader("Cache-Control", "no-cache,no-store");
            response.setStatus(HttpServletResponse.SC_OK);
            return HttpServletResponse.SC_OK + "";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    正常客户端默认设置的请求超时时间是30s,但这里我们发现服务端“偷偷”的给减掉了500ms,现在超时时间只剩下了29.5s,那为什么要这样做呢?
    用官方的解释之所以要提前500ms响应请求,为了最大程度上保证客户端不会因为网络延时造成超时,考虑到请求可能在负载均衡时会耗费一些时间,毕竟Nacos最初就是按照阿里自身业务体量设计的嘛!
    此时对客户端提交上来的groupkey的MD5与服务端当前的MD5比对,如md5值不同,则说明服务端的配置项发生过变更,直接将该groupkey放入changedGroupKeys集合并返回给客户端。如未发生变更,则将客户端请求挂起,这个过程先创建一个名为ClientLongPolling的调度任务Runnable,并提交给scheduler定时线程池延后29.5s执行。

     /**
         * Add LongPollingClient.
         *
         * @param req              HttpServletRequest.
         * @param rsp              HttpServletResponse.
         * @param clientMd5Map     clientMd5Map.
         * @param probeRequestSize probeRequestSize.
         */
        public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                int probeRequestSize) {
            
            String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
            String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
            String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
            String tag = req.getHeader("Vipserver-Tag");
            int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
            
            // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
            long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
            if (isFixedPolling()) {
                timeout = Math.max(10000, getFixedPollingInterval());
                // Do nothing but set fix polling timeout.
            } else {
                long start = System.currentTimeMillis();
                List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
                if (changedGroups.size() > 0) {
                    generateResponse(req, rsp, changedGroups);
                    LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                            RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                            changedGroups.size());
                    return;
                } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                    LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                            RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                            changedGroups.size());
                    return;
                }
            }
            String ip = RequestUtil.getRemoteIp(req);
            
            // Must be called by http thread, or send response.
            final AsyncContext asyncContext = req.startAsync();
            
            // AsyncContext.setTimeout() is incorrect, Control by oneself
            asyncContext.setTimeout(0L);
            
            ConfigExecutor.executeLongPolling(
                    new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    这里每个长轮询任务携带了一个asyncContext对象,使得每个请求可以延迟响应,等延时到达或者配置有变更之后,调用asyncContext.complete()响应完成。
    ClientLongPolling任务被提交进入延迟线程池执行的同时,服务端会通过一个allSubs队列保存所有正在被挂起的客户端长轮询请求任务,这个是客户端注册监听的过程。
    如延时期间客户端据数一直未变化,延时时间到达后将本次长轮询任务从allSubs队列剔除,并响应请求response,这是取消监听。收到响应后客户端再次发起长轮询,循环往复。

     class ClientLongPolling implements Runnable {
            
            @Override
            public void run() {
                asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(() -> {
                    try {
                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
    
                        // Delete subscriber's relations.
                        boolean removeFlag = allSubs.remove(ClientLongPolling.this);
    
                        if (removeFlag) {
                            if (isFixedPolling()) {
                                LogUtil.CLIENT_LOG
                                        .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                                RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                                "polling", clientMd5Map.size(), probeRequestSize);
                                List<String> changedGroups = MD5Util
                                        .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                                (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                                if (changedGroups.size() > 0) {
                                    sendResponse(changedGroups);
                                } else {
                                    sendResponse(null);
                                }
                            } else {
                                LogUtil.CLIENT_LOG
                                        .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                                RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                                "polling", clientMd5Map.size(), probeRequestSize);
                                sendResponse(null);
                            }
                        } else {
                            LogUtil.DEFAULT_LOG.warn("client subsciber's relations delete fail.");
                        }
                    } catch (Throwable t) {
                        LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                    }
    
                }, timeoutTime, TimeUnit.MILLISECONDS);
                
                allSubs.add(this);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    在这里插入图片描述
    到这我们知道服务端是如何挂起客户端长轮询请求的,一旦请求在挂起期间,用户通过管理平台操作了配置项,或者服务端收到了来自其他客户端节点修改配置的请求。

    怎么能让对应已挂起的任务立即取消,并且及时通知客户端数据发生了变更呢?

    4.3.2 数据变更

    管理平台或者客户端更改配置项接位置ConfigController中的publishConfig方法。
    值得注意得是,在publishConfig接口中有这么一段逻辑,某个dataId配置数据被修改时会触发一个数据变更事件Event。

    if (StringUtils.isBlank(betaIps)) {
                if (StringUtils.isBlank(tag)) {
                    persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                    ConfigChangePublisher.notifyConfigChange(
                            new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
                } else {
                    persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                    ConfigChangePublisher.notifyConfigChange(
                            new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
                }
            } else {
                // beta publish
                configInfo.setEncryptedDataKey(encryptedDataKey);
                persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    值得注意得是,在publishConfig接口中有这么一段逻辑,某个dataId配置数据被修改时会触发一个数据变更事件Event。

     public LongPollingService() {
            allSubs = new ConcurrentLinkedQueue<>();
            
            ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
            
            // Register LocalDataChangeEvent to NotifyCenter.
            NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
            
            // Register A Subscriber to subscribe LocalDataChangeEvent.
            NotifyCenter.registerSubscriber(new Subscriber() {
                
                @Override
                public void onEvent(Event event) {
                    if (isFixedPolling()) {
                        // Ignore.
                    } else {
                        if (event instanceof LocalDataChangeEvent) {
                            LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                            ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                        }
                    }
                }
                
                @Override
                public Class<? extends Event> subscribeType() {
                    return LocalDataChangeEvent.class;
                }
            });
            
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    仔细看LongPollingService会发现在它的构造方法中,正好订阅了数据变更事件,并在事件触发时执行一个数据变更调度任务DataChangeTask
    DataChangeTask内的主要逻辑就是遍历allSubs队列,上边我们知道,这个队列中维护的是所有客户端的长轮询请求任务,从这些任务中找到包含当前发生变更的groupkey的ClientLongPolling任务,以此实现数据更变推送给客户端,并从allSubs队列中剔除此长轮询任务。

    class DataChangeTask implements Runnable {
            
            @Override
            public void run() {
                try {
                    ConfigCacheService.getContentBetaMd5(groupKey);
                    for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                        ClientLongPolling clientSub = iter.next();
                        if (clientSub.clientMd5Map.containsKey(groupKey)) {
                            // If published tag is not in the beta list, then it skipped.
                            if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                                continue;
                            }
                            
                            // If published tag is not in the tag list, then it skipped.
                            if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                                continue;
                            }
                            
                            getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                            iter.remove(); // Delete subscribers' relationships.
                            LogUtil.CLIENT_LOG
                                    .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                            RequestUtil
                                                    .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                            "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                            clientSub.sendResponse(Arrays.asList(groupKey));
                        }
                    }
                    
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
                }
            }
            
            DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {
                this(groupKey, isBeta, betaIps, null);
            }
            
            DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) {
                this.groupKey = groupKey;
                this.isBeta = isBeta;
                this.betaIps = betaIps;
                this.tag = tag;
            }
            
            final String groupKey;
            
            final long changeTime = System.currentTimeMillis();
            
            final boolean isBeta;
            
            final List<String> betaIps;
            
            final String tag;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    Linux驱动开发入门
    032-JAVA窗体图形图像处理(Graphics绘图,五子棋游戏实战)
    一款带数字传输信号的OVP芯片
    QLineEdit 使用QValidator 限制各种输入
    17 Linux 中断
    2023黑龙江八一农垦大学计算机考研信息汇总
    cocoapods使用
    Chromium源码由浅入深(一)
    2022-03-18-SpringBoot
    ES6 入门教程 11 对象的新增方法 11.5 Object.keys(),Object.values(),Object.entries()
  • 原文地址:https://blog.csdn.net/qq_39361915/article/details/125879267