• AsyncContext优雅实现HTTP长轮询接口


    一、背景

    接到一个需求,实现方案时需要提供一个HTTP接口,接口需要hold住5-8秒,轮询查询数据库,一旦数据库中值有变化,取出变化的值进行处理,处理完成后返回响应。这不就是长轮询吗,如何优雅的实现呢?

    在这之前先简单介绍下长连接和短连接

    HTTP长链接(Keep-Alive)
    • 概念: HTTP长链接是指客户端与服务器在一次TCP连接上可以传输多个HTTP请求和响应。在请求完成后,连接不会立即关闭,而是保持开放状态,等待可能的后续请求。

    • 优势:

      • 减少延迟: 长链接避免了每次请求都需要重新建立TCP连接的开销,降低了通信延迟。
      • 减少资源占用: 不需要频繁地打开和关闭连接,减少了服务器资源的占用。
    • 应用场景:

      • 实时性要求高的应用: 长链接适用于需要实时性响应的应用,例如即时通讯、实时更新等。
      • 资源加载优化: 在Web开发中,适用于多个资源(如图片、样式表、脚本)的同时加载。
    HTTP短连接
    • 概念: HTTP短连接是指每个HTTP请求都需要建立一个新的TCP连接,请求完成后立即关闭连接。

    • 优势:

      • 简单: 短连接模式相对简单,易于理解和实现。
      • 更好的控制: 对于某些资源密集型的应用,短连接可以更好地控制资源的释放。
    • 应用场景:

      • 低并发场景: 当并发请求数较低时,短连接可能更适用,因为它避免了长链接的开销。
      • 资源密集型应用: 对于服务器资源消耗较大的应用,短连接可能更容易控制资源的释放。
    何为轮询

    所谓轮询,即是在一个循环周期内不断发起请求来得到数据的机制。只要有请求的的地方,都可以实现轮询,譬如各种事件驱动模型。它的长短是在于某次请求的返回周期。

    1. 短轮询

    短轮询指的是在循环周期内,不断发起请求,每一次请求都立即返回结果,根据新1日数据对比决定是否使用这个结果。

    2. 长轮询

    而长轮询及是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的数据,再返回,然后再进入循环周期。

    长短轮询和长短连接的区别
    1. 第一个区别是决定的方式,

    一个TCP连接是否为长连接,是通过设置HTTP的Connection Header来决定的,而且是需要两边都设置才有效。而一种轮询方式是否为长轮询,是根据服务端的处理方式来决定的,与客户端没有关系。

    2. 第二个区别就是实现的方式

    连接的长短是通过协议来规定和实现的。而轮询的长短,是服务器通过编程的方式手动挂起请求来实现的。

    二、方案设计

    在 Spring 中,AsyncContext 是用于支持异步处理的一个重要的特性。它允许我们在 servlet 请求处理过程中,将长时间运行的操作放在一个单独的线程中执行,而不会阻塞其他请求的处理。

    AsyncContext 在以下两种情况下特别有用:

    1. 长时间运行的操作:当我们需要执行一些耗时的操作,例如网络请求、数据库查询或其他 I/O 操作时,通过将这些操作放在一个新的线程中,可以避免阻塞 servlet 容器中的线程,提高应用的并发性能。

    2. 推送异步响应:有时候,我们可能需要推送异步产生的响应,而不是等到所有操作都完成后再下发响应。通过 AsyncContext,我们可以在任何时间点上触发异步响应,将结果返回给客户端。

    使用 AsyncContext 的步骤如下:

    1. 在 servlet 中启用异步模式:在 servlet 中,通过调用 startAsync() 方法,可以获取到当前请求的 AsyncContext 对象,从而启用异步处理模式。
    1. HttpServletRequest request = ...;
    2. AsyncContext asyncContext = request.startAsync();
    1. 指定异步任务:通过调用 AsyncContext 对象的 start() 方法,在新的线程中执行需要异步处理的任务。
    1. asyncContext.start(() -> {
    2. // 异步任务逻辑
    3. });
    1. 提交响应:在异步任务完成后,可以调用 AsyncContext 对象的 complete() 方法,以表示异步操作完成。
    asyncContext.complete();
    

    需要注意的是,我们在使用 AsyncContext 时需要特别注意线程安全。由于异步任务在单独的线程中执行,所以可能存在并发问题。因此,在编写异步任务逻辑时,需要注意线程安全性,使用合适的同步措施。

    另外,AsyncContext 也支持超时设置、错误处理、事件监听等功能,这些可以通过相应的方法和回调进行配置。可以根据具体的需求使用这些功能来优化异步处理的逻辑。

    总结来说,Spring 的 AsyncContext 提供了方便的异步处理机制,可以提高应用的并发性能,并支持推送异步响应,使得应用更具有响应性和可伸缩性。

    三、方案1

    1. import com.google.common.util.concurrent.ThreadFactoryBuilder;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.data.redis.core.RedisTemplate;
    4. import org.springframework.web.bind.annotation.PostMapping;
    5. import org.springframework.web.bind.annotation.RequestBody;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RestController;
    8. import javax.annotation.Resource;
    9. import javax.servlet.AsyncContext;
    10. import javax.servlet.AsyncEvent;
    11. import javax.servlet.AsyncListener;
    12. import javax.servlet.http.HttpServletRequest;
    13. import javax.servlet.http.HttpServletResponse;
    14. import java.io.IOException;
    15. import java.util.concurrent.*;
    16. @RestController
    17. @RequestMapping("/api/test")
    18. @Slf4j
    19. public class AsyncTestController {
    20. @Resource
    21. private RedisTemplate redisTemplate;
    22. private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
    23. private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
    24. // private static boolean result = false;
    25. @PostMapping("/async")
    26. public void async(HttpServletRequest request, HttpServletResponse response) {
    27. // 创建AsyncContext
    28. AsyncContext asyncContext = request.startAsync(request, response);
    29. // 设置处理超时时间8s
    30. asyncContext.setTimeout(8000L);
    31. // asyncContext监听
    32. AsyncTestListener asyncListener = new AsyncTestListener(redisTemplate,asyncContext);
    33. asyncContext.addListener(asyncListener);
    34. // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
    35. asyncContext.start(asyncListener);
    36. }
    37. // 模拟业务处理完成
    38. @PostMapping("/set")
    39. public ResultModel notify(String key, String value) {
    40. redisTemplate.opsForValue().set(key, value);
    41. return ResultModel.success();
    42. }
    43. @PostMapping("/get")
    44. public ResultModel get(String key) {
    45. String s = redisTemplate.opsForValue().get(key);
    46. return ResultModel.success(s);
    47. }
    48. @PostMapping("/del")
    49. public ResultModel del(String key) {
    50. redisTemplate.delete(key);
    51. return ResultModel.success();
    52. }
    53. }
    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.data.redis.core.RedisTemplate;
    3. import javax.servlet.AsyncContext;
    4. import javax.servlet.AsyncEvent;
    5. import javax.servlet.AsyncListener;
    6. import java.io.IOException;
    7. @Slf4j
    8. public class AsyncTestListener implements AsyncListener,Runnable {
    9. boolean isComplete;
    10. private RedisTemplate redisTemplate;
    11. private AsyncContext asyncContext;
    12. public JdAsyncTestListener(RedisTemplate redisTemplate, AsyncContext asyncContext) {
    13. this.redisTemplate = redisTemplate;
    14. this.asyncContext = asyncContext;
    15. }
    16. @Override
    17. public void run() {
    18. try {
    19. while(true){
    20. if(isComplete){
    21. log.info("已经退出");
    22. break;
    23. }
    24. boolean b = redisTemplate.opsForValue().get(1) != null;
    25. log.info("获取标志位:"+b);
    26. Thread.sleep(300);
    27. if (b) {
    28. asyncContext.getResponse().getWriter().print(1);
    29. asyncContext.complete();
    30. }
    31. }
    32. } catch (IOException e) {
    33. e.printStackTrace();
    34. } catch (InterruptedException e) {
    35. throw new RuntimeException(e);
    36. }
    37. }
    38. @Override
    39. public void onComplete(AsyncEvent asyncEvent) throws IOException {
    40. log.info("结束了");
    41. isComplete = true;
    42. }
    43. @Override
    44. public void onTimeout(AsyncEvent asyncEvent) throws IOException {
    45. log.info("超时了");
    46. isComplete = true;
    47. }
    48. @Override
    49. public void onError(AsyncEvent asyncEvent) throws IOException {
    50. }
    51. @Override
    52. public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
    53. }
    54. }

    四、方案2

    1. import com.alibaba.fastjson.JSON;
    2. import com.google.common.collect.Lists;
    3. import com.google.common.collect.Maps;
    4. import com.google.common.util.concurrent.ThreadFactoryBuilder;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.apache.commons.lang3.StringUtils;
    7. import org.apache.dubbo.config.annotation.Reference;
    8. import org.springframework.beans.factory.annotation.Value;
    9. import org.springframework.data.redis.core.RedisTemplate;
    10. import org.springframework.validation.annotation.Validated;
    11. import org.springframework.web.bind.annotation.PostMapping;
    12. import org.springframework.web.bind.annotation.RequestBody;
    13. import org.springframework.web.bind.annotation.RequestMapping;
    14. import org.springframework.web.bind.annotation.RestController;
    15. import javax.annotation.Resource;
    16. import javax.servlet.AsyncContext;
    17. import javax.servlet.AsyncEvent;
    18. import javax.servlet.AsyncListener;
    19. import javax.servlet.http.HttpServletRequest;
    20. import javax.servlet.http.HttpServletResponse;
    21. import java.io.IOException;
    22. import java.util.List;
    23. import java.util.Map;
    24. import java.util.concurrent.*;
    25. @Validated
    26. @RestController
    27. @RequestMapping("/api/test")
    28. @Slf4j
    29. public class TestController {
    30. @Resource
    31. private RedisTemplate redisTemplate;
    32. private final ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(10, threadFactory);
    33. private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
    34. private static boolean result = false;
    35. private final boolean isTimeout = false;
    36. /**
    37. * 消息
    38. *
    39. * @return
    40. */
    41. @PostMapping("/test")
    42. public void callback(@RequestBody TestLongPollRequest testLongPollRequest, HttpServletRequest request, HttpServletResponse response) {
    43. // 创建AsyncContext
    44. AsyncContext asyncContext = request.startAsync(request, response);
    45. String test = LongPollRequest.getctomerId();
    46. // 设置处理超时时间8s
    47. asyncContext.setTimeout(8000L);
    48. // asyncContext监听
    49. asyncContext.addListener(new AsyncListener() {
    50. @Override
    51. public void onComplete(AsyncEvent asyncEvent) throws IOException {
    52. log.info("onComplete={}", asyncEvent);
    53. }
    54. @Override
    55. public void onTimeout(AsyncEvent asyncEvent) throws IOException {
    56. log.info("onTimeout={}", asyncEvent);
    57. ConcurrentHashMap map = new ConcurrentHashMap<>();
    58. map.put("code", "500"); asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
    59. asyncContext.complete();
    60. }
    61. @Override
    62. public void onError(AsyncEvent asyncEvent) throws IOException {
    63. log.info("onError={}", asyncEvent);
    64. }
    65. @Override
    66. public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
    67. log.info("onStartAsync={}", asyncEvent);
    68. }
    69. });
    70. // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
    71. timeoutChecker.scheduleAtFixedRate(() -> {
    72. try {
    73. String redisKey = getRedisKey(customerId);
    74. String redisValue = redisTemplate.opsForValue().get(redisKey);
    75. result = StringUtils.isNotBlank(redisValue);
    76. if (result) {
    77. //todo 长轮询查询数据库。通过customerId查询
    78. send(test, redisValue);
    79. ConcurrentHashMap map = new ConcurrentHashMap<>();
    80. map.put("code", "200");
    81. map.put("msg", redisValue);
    82. asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
    83. asyncContext.complete();
    84. }
    85. } catch (IOException e) {
    86. e.printStackTrace();
    87. }
    88. }, 0, 100L, TimeUnit.MILLISECONDS);
    89. }
    90. /**
    91. * 发送消息
    92. */
    93. private void send(String test, String content) {
    94. }
    95. }

  • 相关阅读:
    ACM. HJ16 购物单 ●●
    平年,闰年 质数 有效日期
    Electron webview 内网页 与 preload、 渲染进程、主进程的常规通信 以及企业级开发终极简化通信方式汇总
    机器学习策略篇:详解单一数字评估指标(Single number evaluation metric)
    基于MATLAB和k-means算法实现的图像分割
    猫头虎博主赠书三期:《Go编程进阶实战: 开发命令行应用、HTTP应用和gRPC应用》
    android.support.v7.app.AlertDialog
    坚持与确定性:毒药还是良药?
    java面试100题(应届生必备)
    MySQL之事务和索引
  • 原文地址:https://blog.csdn.net/xrq1995/article/details/132674458