• Spring Boot异步请求处理框架


    1、前言

    1. ​ 在Spring Boot项目中,经常会遇到处理时间过长,导致出现HTTP请求超时问题,状态码:502
    2. ​ 例如一个文件导入接口需要导入一个Excel文件的学员记录,原来是针对一个班的学员,最多500条记录,1分钟的HTTP超时时长之内基本可以响应。现在将很多班级的学员混在一起,放在一个Excel文件中(这样可以提高操作人员的工作效率),比如5万条学员记录,于是就出现HTTP请求超时问题。
    3. ​ ​解决方案有:1)Ajax异步请求方式;2)WebSocket方式;3)异步请求处理方式:请求+轮询。
    4. ​ 方案1,需要调整HTTP超时设置,Spring Boot开启异步处理(使用@EnableAsync和@Async),这种方式,问题是超时时长要设置多大,没有底。
    5. ​ 方案2,需要前端支持Web2.0,对浏览器有所限制,代码也变得复杂。
    6. ​ 方案3,使用异步请求处理。所谓异步请求处理,就是将请求异步化,前端发起请求后,后端很快就响应,返回一个任务ID,前端再用这个任务ID去轮询,获取处理进程和结果信息。需要两个接口:任务请求接口和任务信息轮询接口。
    7. ​ 显然,对于长时间的业务处理,通过轮询,获取处理进程信息,可以获得较好的用户体验。正如大文件下载,用户可以了解下载的进度一样,业务处理同样可以通过输出处理日志信息和进度,使得长时间业务处理过程可视化,而不至于让用户长时间面对一个在空转鼠标符号。
    8. ​ 本文针对方案3,提出一种通用的处理框架。使用这个通用的异步请求处理框架,可以适应各种不同的需要长时间异步处理的业务需求。

    2、异步请求处理框架描述

    ​	本异步请求处理框架,主要包括任务信息、任务执行类(Runnable)、任务管理器。
    

    2.1、任务信息对象类TaskInfo

    1. ​ 任务信息对象,用于存储任务信息,其生命周期为:创建任务==>加入任务队列==>加入线程池工作线程队列==>任务执行==>任务执行完成==>任务对象缓存超期销毁。
    2. 1)任务识别信息:
    3. 1.1)任务ID:任务ID用于识别任务信息,一个任务ID对应一个任务,是全局唯一的,不区分任务类型。
    4. 1.2)任务名称:即任务类型的名称,对应于业务处理类型名称,如查询商品单价、查询商品库存等,这样可方便可视化识别任务。一个任务名称可以有多个任务实例。
    5. 1.3)会话ID(sessionId):用于身份识别,这样只有请求者才能根据返回的任务ID来查询任务信息,其它用户无权访问。想象一下同步请求,谁发起请求,响应给谁。
    6. 2)任务调用信息:使得任务管理器可以使用反射方法,调用任务处理方法。
    7. 2.1)任务处理对象:这是业务处理对象,为Object类型,一般为Service实现类对象。
    8. 2.2)任务处理方法:这是一个Method对象类型,为异步处理的业务处理方法对象。这个方法必须是public的方法。
    9. 2.3)任务方法参数:这是一个Map<String,Object>类型字典对象,可适应任意参数结构。
    10. 3)任务处理过程和结果相关信息:可以提供任务处理过程和结果可视化的信息。
    11. 3.1)任务状态:表示任务目前的处理状态,0-未处理,1-处理中,2-处理结束。
    12. 3.2)处理日志:这是一个List<String>类型的字符串列表,用于存放处理日志。处理日志格式化:"time level taskId taskName --- logInfo",便于前端展示。
    13. 3.3)处理进度百分比:这是double类型数据,0.0-100.0,业务单元可视需要使用。
    14. 3.4)处理结果:这是一个Object类型对象,真实数据类型由业务单元约定。在未处理结束前,该值为null,处理结束后,如有返回值,此时赋值。
    15. 3.5)返回码:业务处理,可能遇到异常,如需设置返回码,此处赋值。
    16. 3.6)返回消息:与返回码相联系的提示信息。
    17. 3.7)开始处理时间戳:在任务启动(开始执行时)设置,用于计算业务处理的耗时时长。
    18. 4)任务缓存到期时间:任务处理完成后,任务信息会缓存一段时间(如60秒),等待前端获取,超期后,任务对象被销毁,意味着再也无法获取任务信息了。后端系统不可能累积存放超期的任务信息,否则可能导致OOM(Out Of Memory)异常。

    2.2、任务执行类TaskRunnable

    1. ​ 任务执行类,实现Runnable接口,是为线程池的工作线程提供处理方法。
    2. ​ 任务执行类,使用任务信息对象作为参数,并调用任务信息的任务调用信息,使用反射方法,来执行任务处理。

    2.3、任务管理器类TaskManService

    1. ​ 任务管理器,全局对象,使用@Service注解,加入Spring容器。这样,任何需要异步处理的业务都可以访问任务管理器。
    2. ​ 任务管理器,包含下列属性:
    3. 1)任务队列:LinkedBlockingQueue<TaskInfo>类型,考虑到OOM问题,容量使用有限值,如1万,即最大缓存1万个任务,相当于二级缓存。
    4. 2)任务信息字典:Map<Integer,TaskInfo>类型,key为taskId,目的是为了方便根据taskId快速查询任务信息。
    5. 3)线程池:ThreadPoolExecutor类型,工作线程队列长度为线程池的最大线程数,相当于一级缓存。可以设置核心线程数,最大线程数,工作线程队列长度等参数,如设置核心线程数为5,最大线程数为100,工作线程队列长度为100。线程工厂ThreadFactory使用Executors.defaultThreadFactory()。
    6. 4)任务ID计数器:AtomicInteger类型,用于分配唯一的任务ID。
    7. 5)监视线程:用于任务调度,以及检查缓存到期时间超期的已结束任务信息。
    8. 6)监视线程的执行类对象:Runnable对象,提供监视线程的执行方法。
    9. 7)上次检查时间戳:用于检查缓存到期时间,每秒1次检查。
    10. ​ 任务管理器,包含下列接口方法:
    11. 1)添加任务:addTask,获取sessionId,检查任务处理对象、方法及参数是否为null,然后分配任务ID,创建任务对象,加入任务队列。如果参数为void,也需要构造一个空的Map<String,Object>字典对象。如果任务队列未满,就将任务加入任务队列中,并返回包含任务ID的字典,否则抛出“任务队列已满”的异常信息。
    12. 2)获取任务信息:getTaskInfo,参数为request和任务ID,如果sessionId与请求时相同,且任务对象能在任务信息字典中找到,就返回任务信息对象,否则抛出相关异常。
    13. ​ 任务管理器的核心方法:
    14. 1)初始化:使用@PostConstruct注解,启动监视线程,并预启动线程池的一个核心线程。
    15. 2)监视线程的执行类run方法:实现每秒一次的超期已处理结束的任务信息的检查,以及任务调度。任务调度方法:
    16. 2.1)如果任务队列非空,且线程池未满,则取出一个任务信息对象,并创建一个任务执行类对象,加入到线程池的工作线程队列(execute方法加入)。
    17. 2.2)如果任务队列非空,且线程池已满,则等待100毫秒。
    18. 2.3)如果任务队列为空,则等待100毫秒。

    3、异步请求处理框架代码

    3.1、任务信息对象类TaskInfo

    ​	任务信息对象类TaskInfo,代码如下:
    
    1. package com.abc.example.asyncproc;
    2. import java.lang.reflect.Method;
    3. import java.time.LocalDateTime;
    4. import java.time.format.DateTimeFormatter;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. import java.util.Map;
    8. import lombok.Data;
    9. /**
    10. * @className : TaskInfo
    11. * @description : 任务信息
    12. * @summary :
    13. * @history :
    14. * ------------------------------------------------------------------------------
    15. * date version modifier remarks
    16. * ------------------------------------------------------------------------------
    17. * 2022/08/17 1.0.0 sheng.zheng 初版
    18. *
    19. */
    20. @Data
    21. public class TaskInfo {
    22. //
    23. // 任务识别信息
    24. // 任务ID
    25. private Integer taskId = 0;
    26. // sessionId,用于识别请求者
    27. private String sessionId = "";
    28. // 任务名称,即业务处理的名称,如查询商品最低价,导入学员名册
    29. private String taskName = "";
    30. //
    31. // 任务执行相关的
    32. // 请求参数,使用字典进行封装,以便适应任意数据结构
    33. private Map params;
    34. // 处理对象,一般是service对象
    35. private Object procObject;
    36. // 处理方法
    37. private Method method;
    38. //
    39. // 任务处理产生的数据,中间数据,结果
    40. // 处理状态,0-未处理,1-处理中,2-处理结束
    41. private int procStatus = 0;
    42. // 处理结果,数据类型由业务单元约定
    43. private Object result;
    44. // 处理日志,包括中间结果,格式化显示:Time level taskId taskName logInfo
    45. private List logList = new ArrayList();
    46. // 处理进度百分比
    47. private double progress = 0;
    48. // 到期时间,UTC,任务完成后才设置,超时后销毁
    49. private long expiredTime = 0;
    50. // 返回码,保留,0表示操作成功
    51. private int resultCode = 0;
    52. // 响应消息,保留
    53. private String message = "";
    54. // 开始处理时间,便于统计任务处理时长
    55. private long startTime = 0;
    56. //
    57. // 日志相关的方法
    58. private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    59. // 添加处理日志
    60. public void addLogInfo(String level,String logInfo) {
    61. // 格式化显示:Time level taskId taskName logInfo
    62. LocalDateTime current = LocalDateTime.now();
    63. String strCurrent = current.format(df);
    64. String log = String.format("%s %s %d %s --- %s",
    65. strCurrent,level,taskId,taskName,logInfo);
    66. logList.add(log);
    67. }
    68. //
    69. // 不同状态的参数设置接口
    70. // 设置任务初始化,未开始
    71. public void init(Integer taskId,String taskName,String sessionId,
    72. Object procObject,Method method,Map params) {
    73. this.procStatus = 0;
    74. this.taskId = taskId;
    75. this.taskName = taskName;
    76. this.sessionId = sessionId;
    77. this.procObject = procObject;
    78. this.method = method;
    79. this.params = params;
    80. }
    81. // 启动任务
    82. public void start() {
    83. this.procStatus = 1;
    84. addLogInfo(TaskConstants.LEVEL_INFO,"开始处理任务...");
    85. // 记录任务开始处理的时间
    86. startTime = System.currentTimeMillis();
    87. }
    88. // 结束任务
    89. public void finish(Object result) {
    90. this.result = result;
    91. this.procStatus = 2;
    92. // 设置结果缓存的到期时间
    93. expired();
    94. }
    95. // 处理异常
    96. public void error(int resultCode,String message) {
    97. this.resultCode = resultCode;
    98. this.message = message;
    99. this.procStatus = 2;
    100. // 设置结果缓存的到期时间
    101. expired();
    102. }
    103. // 设置过期过期
    104. public void expired() {
    105. long current = System.currentTimeMillis();
    106. this.expiredTime = current + TaskConstants.PROC_EXPIRE_TIME;
    107. long duration = 0;
    108. double second = 0.0;
    109. duration = current - startTime;
    110. second = duration / 1000.0;
    111. addLogInfo(TaskConstants.LEVEL_INFO,"任务处理结束,耗时(s):"+second);
    112. }
    113. }
    ​	说明:任务信息对象类TaskInfo提供了几个常用的处理方法,如addLogInfo、init、start、finish、error,便于简化属性值设置。
    

    3.2、任务执行类TaskRunnable

    ​	任务执行类TaskRunnable,代码如下:
    
    1. package com.abc.example.asyncproc;
    2. import java.lang.reflect.InvocationTargetException;
    3. import java.lang.reflect.Method;
    4. import com.abc.example.common.utils.LogUtil;
    5. import com.abc.example.exception.BaseException;
    6. import com.abc.example.exception.ExceptionCodes;
    7. /**
    8. * @className : TaskRunnable
    9. * @description : 可被线程执行的任务执行类
    10. * @summary :
    11. * @history :
    12. * ---------
  • 相关阅读:
    【STM32】ADC(模拟/数字转换)
    java计算机毕业设计云端小区物业智能管理系统源码+系统+mysql数据库+lw文档+部署
    Flink—— Data Source 介绍
    学习jQuery库的第一天
    14:00面试,14:06就出来了,问的问题有点变态。。。
    PostGIS简单使用
    求各区域热门商品Top3 - HiveSQL
    基于视觉AI的管道高后果区预警系统
    替代STM32的GD32,替代KEIL的Eclipse配置---连载1
    生活不止有诗和远方,也有眼前的美好。也许你心里有清风明月,
  • 原文地址:https://blog.csdn.net/m0_67698950/article/details/126505653