• 实现一个简单的长轮询


    分析一下长轮询的实现方式

    现在各大中间件都使用了长轮询的数据交互方式,目前比较流行的例如Nacos的配置中心,RocketMQ Pull(拉模式)消息等,它们都是采用了长轮询方的式实现。就例如Nacos的配置中心,如何做到服务端感知配置变化实时推送给客户端的呢?

    长轮询与短轮询

    说到长轮询,肯定存在和它相对立的,我们暂且叫它短轮询吧,我们简单介绍一下短轮询:

    短轮询也是拉模式。是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。如果配置中心使用这样的方式,会存在以下问题:

    由于配置数据并不会频繁变更,若是一直发请求,势必会对服务端造成很大压力。还会造成推送数据的延迟,比如:每10s请求一次配置,如果在第11s时配置更新了,那么推送将会延迟9s,等待下一次请求;

    无法在推送延迟和服务端压力两者之间中和。降低轮询的间隔,延迟降低,压力增加;增加轮询的间隔,压力降低,延迟增高。

    长轮询为了解决短轮询存在的问题,客户端发起长轮询,如果服务端的数据没有发生变更,会hold住请求,直到服务端的数据发生变化,或者等待一定时间超时才会返回。返回后,客户端再发起下一次长轮询请求监听。

    这样设计的好处:

    • 相对于低延时,客户端发起长轮询,服务端感知到数据发生变更后,能立刻返回响应给客户端。
    • 服务端的压力减小,客户端发起长轮询,如果数据没有发生变更,服务端会hold住此次客户端的请求,hold住请求的时间一般会设置到30s或者60s,并且服务端hold住请求不会消耗太多服务端的资源。

    下面借用图片来说明一下流程:

    • 首先客户端发起长轮询请求,服务端收到客户端的请求,这时会挂起客户端的请求,如果在服务端设计的30s之内都没有发生变更,服务端会响应回客户端数据没有变更,客户端会继续发送请求。
    • 如果在30s之内服务数据发生了变更,服务端会推送变更的数据到客户端。

    配置中心长轮询设计:

    上面我们已经介绍了整个思路,下面我们用代码实现一下:

    • 首先客户端发送一个HTTP请求到服务端;服务端会开启一个异步线程,如果一直没有数据变更会挂起当前请求(一个 Tomcat 也就 200 个线程,长轮询也不应该阻塞 Tomcat 的业务线程,所以需要配置中心在实现长轮询时往往采用异步响应的方式来实现,而比较方便实现异步 HTTP 的常见手段便是 Servlet3.0 提供的 AsyncContext 机制。)
    • 在服务端设置的超时时间内仍然没有数据变更,那就返回客户端一个没有变更的标识。例如响应304状态码;
    • 在服务端设置的超时时间内有数据变更了,就返回客户端变更的内容;

    配置中心长轮询实现

    下面用代码实现长轮询:

    客户端实现

    1. @Slf4j
    2. public class ConfigClientWorker {
    3. private final CloseableHttpClient httpClient;
    4. private final ScheduledExecutorService executorService;
    5. public ConfigClientWorker(String url, String dataId) {
    6. this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
    7. Thread thread = new Thread(runnable);
    8. thread.setName("client.worker.executor-%d");
    9. thread.setDaemon(true);
    10. return thread;
    11. });
    12. // ① httpClient 客户端超时时间要大于长轮询约定的超时时间
    13. RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
    14. this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
    15. executorService.execute(new LongPollingRunnable(url, dataId));
    16. }
    17. class LongPollingRunnable implements Runnable {
    18. private final String url;
    19. private final String dataId;
    20. public LongPollingRunnable(String url, String dataId) {
    21. this.url = url;
    22. this.dataId = dataId;
    23. }
    24. @SneakyThrows
    25. @Override
    26. public void run() {
    27. String endpoint = url + "?dataId=" + dataId;
    28. log.info("endpoint: {}", endpoint);
    29. HttpGet request = new HttpGet(endpoint);
    30. CloseableHttpResponse response = httpClient.execute(request);
    31. switch (response.getStatusLine().getStatusCode()) {
    32. case 200: {
    33. BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
    34. .getContent()));
    35. StringBuilder result = new StringBuilder();
    36. String line;
    37. while ((line = rd.readLine()) != null) {
    38. result.append(line);
    39. }
    40. response.close();
    41. String configInfo = result.toString();
    42. log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
    43. break;
    44. }
    45. // ② 304 响应码标记配置未变更
    46. case 304: {
    47. log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
    48. break;
    49. }
    50. default: {
    51. throw new RuntimeException("unExcepted HTTP status code");
    52. }
    53. }
    54. executorService.execute(this);
    55. }
    56. }
    57. public static void main(String[] args) throws IOException {
    58. new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
    59. System.in.read();
    60. }
    61. }
    • httpClient 客户端超时时间要大于长轮询约定的超时时间,不然还没等到服务端返回,客户端自己就超时了。
    • 304 响应码标记配置未变更;
    • http://127.0.0.1:8080/listener 是服务端地址;

    服务端实现

    1. @RestController
    2. @Slf4j
    3. @SpringBootApplication
    4. public class ConfigServer {
    5. @Data
    6. private static class AsyncTask {
    7. // 长轮询请求的上下文,包含请求和响应体
    8. private AsyncContext asyncContext;
    9. // 超时标记
    10. private boolean timeout;
    11. public AsyncTask(AsyncContext asyncContext, boolean timeout) {
    12. this.asyncContext = asyncContext;
    13. this.timeout = timeout;
    14. }
    15. }
    16. // guava 提供的多值 Map,一个 key 可以对应多个 value
    17. private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
    18. private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
    19. .build();
    20. private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
    21. // 配置监听接入点
    22. @RequestMapping("/listener")
    23. public void addListener(HttpServletRequest request, HttpServletResponse response) {
    24. String dataId = request.getParameter("dataId");
    25. // 开启异步!!!
    26. AsyncContext asyncContext = request.startAsync(request, response);
    27. AsyncTask asyncTask = new AsyncTask(asyncContext, true);
    28. // 维护 dataId 和异步请求上下文的关联
    29. dataIdContext.put(dataId, asyncTask);
    30. // 启动定时器,30s 后写入 304 响应
    31. timeoutChecker.schedule(() -> {
    32. if (asyncTask.isTimeout()) {
    33. dataIdContext.remove(dataId, asyncTask);
    34. response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
    35. // 标志此次异步线程完成结束!!!
    36. asyncContext.complete();
    37. }
    38. }, 30000, TimeUnit.MILLISECONDS);
    39. }
    40. // 配置发布接入点
    41. @RequestMapping("/publishConfig")
    42. @SneakyThrows
    43. public String publishConfig(String dataId, String configInfo) {
    44. log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
    45. Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
    46. for (AsyncTask asyncTask : asyncTasks) {
    47. asyncTask.setTimeout(false);
    48. HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
    49. response.setStatus(HttpServletResponse.SC_OK);
    50. response.getWriter().println(configInfo);
    51. asyncTask.getAsyncContext().complete();
    52. }
    53. return "success";
    54. }
    55. public static void main(String[] args) {
    56. SpringApplication.run(ConfigServer.class, args);
    57. }
    58. }
    • 客户端请求过来,首先开启一个异步线程request.startAsync(request, response);保证不占用Tomcat线程。此时Tomcat线程以及释放。配合asyncContext.complete()使用。
    • dataIdContext.put(dataId, asyncTask);会将 dataId 和异步请求上下文给关联起来,方便配置发布时,拿到对应的上下文
    • Multimap dataIdContext它是一个多值 Map,一个 key 可以对应多个 value,你也可以理解为 Map>
    • timeoutChecker.schedule() 启动定时器,30s 后写入 304 响应
    • @RequestMapping("/publishConfig") ,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。
    • asyncTask.getAsyncContext().complete();表示这次异步请求结束了。

    启动配置监听

    先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码

    1. 16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again

    请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld

    服务端打印日志:

    1. 2022-08-25 16:45:56.663 INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer : publish configInfo dataId: [user], configInfo: helloworld

    至此上面试整个长轮询过程。

  • 相关阅读:
    流量回放平台与传统测试工具的对比分析
    深度学习踩坑笔记:载入内存,数据分配与重启问题,安装R
    【ENOVIA 服务包】知识重用解决方案 | 达索系统百世慧®
    React工具
    Codewhisperer 使用评价
    C++重温笔记(十一): C++文件操作
    基于蜜蜂算法的函数寻优及TSP搜索算法
    ​马化腾称腾讯应该去做难而正确的事;iOS 15正式版发布,改善信号表现;Dart 2.17发布|极客头条
    使用 python multiprocessing.Queue 出现 too many open files 错误
    Oracle杀会话回滚时间长处理办法
  • 原文地址:https://blog.csdn.net/m0_57042151/article/details/126813671