• Nacos的长轮询实践



    一、背景介绍

    在这里插入图片描述

    Nacos长轮询的基本思路是通过Servlet3.0后提供的异步处理能力,把请求的任务添加至队列中,在有数据发生变更时,从队列中取出相应请求,然后响应请求,负责拉取数据的接口通过延时任务完成超时处理,如果等到设定的超时时间还没有数据变更时,就主动推送超时信息完成响应。

    关于心跳检测参考:https://blog.csdn.net/sunquan291/article/details/126556366

    二、简单实现

    如下是源码的实现:

    @RestController
    @RequestMapping("/nacos")
    public class NacosLongPollingController extends HttpServlet {
        @Autowired
        private NacosLongPollingService nacosLongPollingService;
    
        @RequestMapping("/pull")
        @Override
        protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            String dataId = req.getParameter("dataId");
            if (StringUtils.isEmpty(dataId)) {
                throw new IllegalArgumentException("请求参数异常,dataId能为空");
            }
            nacosLongPollingService.doGet(dataId, req, resp);
        }
    
        @GetMapping("/push")
        public Result push(@RequestParam("dataId") String dataId, @RequestParam("data") String data) {
            if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(data)) {
                throw new IllegalArgumentException("请求参数异常,dataId和data均不能为空");
            }
            nacosLongPollingService.push(dataId, data);
            return new Result(data);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    package com.example.demo.control;
    
    public interface NacosLongPollingService {
        void doGet(String dataId, HttpServletRequest req, HttpServletResponse resp);
    
        void push(String dataId, String data);
    
        Map<String, String> getDataStage();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    package com.example.demo.control;
    
    @Service
    public class NacosLongPollingServiceImpl implements NacosLongPollingService {
    
        private Map<String, String> dataStage = new HashMap<>();
    
        final ScheduledExecutorService scheduler;
        final Queue<NacosPullTask> nacosPullTasks;
    
        public NacosLongPollingServiceImpl() {
            scheduler = new ScheduledThreadPoolExecutor(1, r -> {
                Thread t = new Thread(r);
                t.setName("NacosLongPollingTask");
                t.setDaemon(true);
                return t;
            });
            nacosPullTasks = new ConcurrentLinkedQueue<>();
            scheduler.scheduleAtFixedRate(() -> System.out.println("线程存活状态:" + new Date()), 0L, 5, TimeUnit.SECONDS);
        }
    
    
        @Override
        public void doGet(String dataId, HttpServletRequest req, HttpServletResponse resp) {
            // 一定要由当前HTTP线程调用,如果放在task线程容器会立即发送响应 相当于拿到一个future
            final AsyncContext asyncContext = req.startAsync();
            scheduler.execute(new NacosPullTask(nacosPullTasks, scheduler, asyncContext, dataId, req, resp));
        }
    
        @Override
        public void push(String dataId, String data) {
            scheduler.schedule(new NacosPushTask(this,dataId, data, nacosPullTasks), 0L, TimeUnit.MILLISECONDS);
        }
    
        @Override
        public Map<String, String> getDataStage() {
            return dataStage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    package com.example.demo.control;
    
    @Slf4j
    public class NacosPullTask implements Runnable {
        Queue<NacosPullTask> nacosPullTasks;
        ScheduledExecutorService scheduler;
        AsyncContext asyncContext;
        String dataId;
        HttpServletRequest req;
        HttpServletResponse resp;
    
        Future<?> asyncTimeoutFuture;
    
        public NacosPullTask(Queue<NacosPullTask> nacosPullTasks, ScheduledExecutorService scheduler, AsyncContext asyncContext, String dataId, HttpServletRequest req, HttpServletResponse resp) {
            this.nacosPullTasks = nacosPullTasks;
            this.scheduler = scheduler;
            this.asyncContext = asyncContext;
            this.dataId = dataId;
            this.req = req;
            this.resp = resp;
        }
    
        @Override
        public void run() {
            asyncTimeoutFuture = scheduler.schedule(() -> {
                //超时等待10s
                log.info("10秒后超时结束长轮询任务:" + new Date());
                nacosPullTasks.remove(NacosPullTask.this);
    
                sendResponse("time-out");
    
            }, 10, TimeUnit.SECONDS);
            nacosPullTasks.add(this);
        }
    
        public void sendResponse(String result) {
            System.out.println("发送响应:" + new Date());
            //取消等待执行的任务,避免已经响完了,还有资源被占用
            if (asyncTimeoutFuture != null) {
                //设置为true会立即中断执行中的任务,false对执行中的任务无影响,但会取消等待执行的任务
                asyncTimeoutFuture.cancel(false);
            }
    
            //设置页码编码
            resp.setContentType("application/json; charset=utf-8");
            resp.setCharacterEncoding("utf-8");
    
            //禁用缓存
            resp.setHeader("Pragma", "no-cache");
            resp.setHeader("Cache-Control", "no-cache,no-store");
            resp.setDateHeader("Expires", 0);
            resp.setStatus(HttpServletResponse.SC_OK);
            //输出Json流
            sendJsonResult(result);
        }
    
        private void sendJsonResult(String result) {
            Result<String> pojoResult = new Result<>();
            pojoResult.setCode(200);
            pojoResult.setSuccess(!StringUtils.isEmpty(result));
            pojoResult.setData(result);
            PrintWriter writer = null;
            try {
                writer = asyncContext.getResponse().getWriter();
                writer.write(pojoResult.toString());
                writer.flush();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                asyncContext.complete();
                if (null != writer) {
                    writer.close();
                }
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    package com.example.demo.control;
    
    public class NacosPushTask implements Runnable {
        private String dataId;
        private String data;
        private Queue<NacosPullTask> nacosPullTasks;
        NacosLongPollingService nacosLongPollingService;
    
        public NacosPushTask(NacosLongPollingService service, String dataId, String data, Queue<NacosPullTask> nacosPullTasks) {
            this.nacosLongPollingService = service;
            this.dataId = dataId;
            this.data = data;
            this.nacosPullTasks = nacosPullTasks;
        }
    
        @Override
        public void run() {
            Iterator<NacosPullTask> iterator = nacosPullTasks.iterator();
            while (iterator.hasNext()) {
                NacosPullTask nacosPullTask = iterator.next();
                if (dataId != null && dataId.equals(nacosPullTask.dataId)) {
                    if (nacosLongPollingService.getDataStage().containsKey(dataId)) {
                        if (!nacosLongPollingService.getDataStage().get(dataId).equals(data)) {
                            //并且发生了修改
                            iterator.remove();
                            nacosPullTask.sendResponse(data);
                        }
    
                    } else {
                        iterator.remove();
                        nacosPullTask.sendResponse(data);
                    }
    
                }
            }
            nacosLongPollingService.getDataStage().put(dataId, data);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    三、测试效果

    拉数据
    在这里插入图片描述
    修改数据
    在这里插入图片描述

    结果:拉数据时,请求会被hold住10秒,超时后会自动返回,如果10秒内对应key的数据变化,则会将数据立即返回。

    1. 在多台机器上同时调用pull获取同个dataId,该key对应值有变化则会触发所有调用即时返回;
    2. 同一个浏览器中,同一地址栏多次输入,hold的第二次请求能即时加入队列
    3. 同一个浏览器中,不同地址栏多次输入,第二次请求被阻塞,无法即时加入队列

    四、Nacos实践

    其原理图如下所示:
    在这里插入图片描述
    在这里插入图片描述
    那么当config配置进行更新变化的时候就需要相关的应用进行跟着相关变化,这里就有个问题了那客户端怎么知道配置变化了呢,客户端是什么时候进行更新的呢?涉及更新方式就有2种,推和拉;

    客户端主动从服务端定时拉取配置,如果有变化则进行替换。
    服务端主动把变化的内容发送给客户端。
    两种方式各有利弊,比如对于推的模式来讲,就需要服务端与客户端进行长连接,那么这种就会出现服务端需要耗费大量资源维护这个链接,并且还得加入心跳机制来维护连接有效性。而对于拉的模式则需要客户端定时去服务端访问,那么就会存在时间间隔,也就保证不了数据的实时性。那nacos采用哪种模式呢?nacos是采用了拉模式是一种特殊的拉模式,也就是我们通常听的长轮询机制。
    如果客户端拉取发现客户端与服务端配置是一致的(其实是通过MD5判断的)那么服务端会先拿住这个请求不返回,直到这段时间内配置有变化了才把刚才拿住的请求返回。他的步骤是nacos服务端收到请求后检查配置是否发生变化,如果没有则开启定时任务,延迟29.5s执行。同时把当前客户端的连接请求放入队列。那么此时服务端并没有将结果返回给客户端,当有以下2种情况的时候才触发返回。

    就是等待29.5s后触发自动检查
    在29.5s内有配置进行了更改
    经过这2种情况才完成这次的pull操作。这种的好处就是保证了客户端的配置能及时变化更新,也减少了轮询给服务端带来的压力。所以之前文章我们说过这个长链接回话超时时间默认是30s。

  • 相关阅读:
    2022 web前端面试宝典 —— 一文带你直击面试重难点(40个经典题目,涵盖近90%的考点,码字2w,干货满满!)
    ANSI / NEMA- MW- 1000-2020 磁铁线标准。. 最新原版
    关于错误使用cudaDeviceReset()函数,导致多线程下cuda错误、进程崩溃的问题
    Pyinstaller+InstallForge多文件项目软件打包
    gstreamer协商negoation
    图和图神经网络的可视化,详解与示例
    C++运算符重载
    原生HTML实现marquee向上滚动效果
    Vue3配置router路由步骤
    ESP32开发三_蓝牙开发
  • 原文地址:https://blog.csdn.net/sunquan291/article/details/111635105