• 【学习总结】SpringBoot中使用单例模式+ScheduledExecutorService实现异步多线程任务(若依源码学习)


    最近在学习若依这个开源项目,发现他记录登录日志的时候使用了异步线程去记录日志,觉得这个方案也挺不错的,在此学习记录下来,以后在工作中也能提供一种思路,其他小伙伴如果有觉得不错的方案也可以在评论区里留言,大家一起探讨一下🍭 


    若依源码

    一、相关工具类

    我们一步步看,先把相关的工具类代码给大家贴出来

    1、Threads工具类

    1. /**
    2. * 线程相关工具类.
    3. *
    4. * @author ruoyi
    5. */
    6. public class Threads
    7. {
    8. private static final Logger logger = LoggerFactory.getLogger(Threads.class);
    9. /**
    10. * sleep等待,单位为毫秒
    11. */
    12. public static void sleep(long milliseconds)
    13. {
    14. try
    15. {
    16. Thread.sleep(milliseconds);
    17. }
    18. catch (InterruptedException e)
    19. {
    20. return;
    21. }
    22. }
    23. /**
    24. * 停止线程池
    25. * 先使用shutdown, 停止接收新任务并尝试完成所有已存在任务.
    26. * 如果超时, 则调用shutdownNow, 取消在workQueue中Pending的任务,并中断所有阻塞函数.
    27. * 如果仍然超時,則強制退出.
    28. * 另对在shutdown时线程本身被调用中断做了处理.
    29. */
    30. public static void shutdownAndAwaitTermination(ExecutorService pool)
    31. {
    32. if (pool != null && !pool.isShutdown())
    33. {
    34. pool.shutdown();
    35. try
    36. {
    37. if (!pool.awaitTermination(120, TimeUnit.SECONDS))
    38. {
    39. pool.shutdownNow();
    40. if (!pool.awaitTermination(120, TimeUnit.SECONDS))
    41. {
    42. logger.info("Pool did not terminate");
    43. }
    44. }
    45. }
    46. catch (InterruptedException ie)
    47. {
    48. pool.shutdownNow();
    49. Thread.currentThread().interrupt();
    50. }
    51. }
    52. }
    53. /**
    54. * 打印线程异常信息
    55. */
    56. public static void printException(Runnable r, Throwable t)
    57. {
    58. if (t == null && r instanceof Future)
    59. {
    60. try
    61. {
    62. Future future = (Future) r;
    63. if (future.isDone())
    64. {
    65. future.get();
    66. }
    67. }
    68. catch (CancellationException ce)
    69. {
    70. t = ce;
    71. }
    72. catch (ExecutionException ee)
    73. {
    74. t = ee.getCause();
    75. }
    76. catch (InterruptedException ie)
    77. {
    78. Thread.currentThread().interrupt();
    79. }
    80. }
    81. if (t != null)
    82. {
    83. logger.error(t.getMessage(), t);
    84. }
    85. }
    86. }

    这个工具类包含了三个方法:

    •  sleep(long milliseconds):这个方法比较简单,就是用来睡眠线程的
    • shutdownAndAwaitTermination(ExecutorService pool)(重点):该方法用于优雅地关闭一个 ExecutorService 并等待其终止。

    1. public static void shutdownAndAwaitTermination(ExecutorService pool)
    2. {
    3. if (pool != null && !pool.isShutdown()) //确保传入的 ExecutorService 不为 null 并且尚未被关闭。
    4. {
    5. /*
    6. 调用 shutdown 方法,这将启动线程池的关闭序列。注意,shutdown 并不会立即停
    7. 止所有正在执行的任务,也不会阻止新任务的提交。它只会使 ExecutorService 不再接受
    8. 新任务,但会等待已提交的任务完成。
    9. */
    10. pool.shutdown();
    11. try
    12. {
    13. if (!pool.awaitTermination(120, TimeUnit.SECONDS))//等待线程池在指定的时间
    14. 内(这里是120秒)完成所有任务并关闭。如果线程池在该时间内没有关闭,则会进入 if 语句块
    15. {
    16. pool.shutdownNow(); //强制停止所有正在执行的任务,它并不能保证所有的任务都会被停止
    17. if (!pool.awaitTermination(120, TimeUnit.SECONDS))//在强制关闭线程池后,再次等待120秒以查看它是否已关闭。
    18. {
    19. logger.info("Pool did not terminate");//如果仍未关闭,则记录一条日志信息。
    20. }
    21. }
    22. }
    23. catch (InterruptedException ie)
    24. {
    25. /*
    26. 如果在等待线程池关闭时被中断(例如,由于另一个线程调用了当前线程的
    27. interrupt 方法),则再次调用 shutdownNow 强制关闭线程池,并重新设置当前线
    28. 程的中断状态。
    29. */
    30. pool.shutdownNow();
    31. Thread.currentThread().interrupt();
    32. }
    33. }
    34. }
    • printException(Runnable r, Throwable t) :这个方法就是用于记录异常信息,写的也是很优雅。

    2、SpringUtils工具类

    1. /**
    2. * spring工具类 方便在非spring管理环境中获取bean
    3. *
    4. * @author ruoyi
    5. */
    6. @Component
    7. public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware
    8. {
    9. /** Spring应用上下文环境 */
    10. private static ConfigurableListableBeanFactory beanFactory;
    11. private static ApplicationContext applicationContext;
    12. @Override
    13. public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
    14. {
    15. SpringUtils.beanFactory = beanFactory;
    16. }
    17. @Override
    18. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
    19. {
    20. SpringUtils.applicationContext = applicationContext;
    21. }
    22. /**
    23. * 获取对象
    24. *
    25. * @param name
    26. * @return Object 一个以所给名字注册的bean的实例
    27. * @throws BeansException
    28. *
    29. */
    30. @SuppressWarnings("unchecked")
    31. public static T getBean(String name) throws BeansException
    32. {
    33. return (T) beanFactory.getBean(name);
    34. }
    35. /**
    36. * 获取类型为requiredType的对象
    37. *
    38. * @param clz
    39. * @return
    40. * @throws BeansException
    41. *
    42. */
    43. public static T getBean(Class clz) throws BeansException
    44. {
    45. T result = (T) beanFactory.getBean(clz);
    46. return result;
    47. }
    48. /**
    49. * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
    50. *
    51. * @param name
    52. * @return boolean
    53. */
    54. public static boolean containsBean(String name)
    55. {
    56. return beanFactory.containsBean(name);
    57. }
    58. /**
    59. * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
    60. *
    61. * @param name
    62. * @return boolean
    63. * @throws NoSuchBeanDefinitionException
    64. *
    65. */
    66. public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    67. {
    68. return beanFactory.isSingleton(name);
    69. }
    70. /**
    71. * @param name
    72. * @return Class 注册对象的类型
    73. * @throws NoSuchBeanDefinitionException
    74. *
    75. */
    76. public static Class getType(String name) throws NoSuchBeanDefinitionException
    77. {
    78. return beanFactory.getType(name);
    79. }
    80. /**
    81. * 如果给定的bean名字在bean定义中有别名,则返回这些别名
    82. *
    83. * @param name
    84. * @return
    85. * @throws NoSuchBeanDefinitionException
    86. *
    87. */
    88. public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    89. {
    90. return beanFactory.getAliases(name);
    91. }
    92. /**
    93. * 获取aop代理对象
    94. *
    95. * @param invoker
    96. * @return
    97. */
    98. @SuppressWarnings("unchecked")
    99. public static T getAopProxy(T invoker)
    100. {
    101. return (T) AopContext.currentProxy();
    102. }
    103. /**
    104. * 获取当前的环境配置,无配置返回null
    105. *
    106. * @return 当前的环境配置
    107. */
    108. public static String[] getActiveProfiles()
    109. {
    110. return applicationContext.getEnvironment().getActiveProfiles();
    111. }
    112. /**
    113. * 获取当前的环境配置,当有多个环境配置时,只获取第一个
    114. *
    115. * @return 当前的环境配置
    116. */
    117. public static String getActiveProfile()
    118. {
    119. final String[] activeProfiles = getActiveProfiles();
    120. return StringUtils.isNotEmpty(Arrays.toString(activeProfiles)) ? activeProfiles[0] : null;
    121. }
    122. /**
    123. * 获取配置文件中的值
    124. *
    125. * @param key 配置文件的key
    126. * @return 当前的配置文件的值
    127. *
    128. */
    129. public static String getRequiredProperty(String key)
    130. {
    131. return applicationContext.getEnvironment().getRequiredProperty(key);
    132. }
    133. }

    二、核心代码

    1、异步任务管理器

    1. /**
    2. * 异步任务管理器
    3. *
    4. * @author ruoyi
    5. */
    6. public class AsyncManager
    7. {
    8. /**
    9. * 操作延迟10毫秒
    10. */
    11. private final int OPERATE_DELAY_TIME = 10;
    12. /**
    13. * 异步操作任务调度线程池
    14. */
    15. private ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");
    16. /**
    17. * 单例模式
    18. */
    19. private AsyncManager(){}
    20. private static AsyncManager me = new AsyncManager();
    21. public static AsyncManager me()
    22. {
    23. return me;
    24. }
    25. /**
    26. * 执行任务
    27. *
    28. * @param task 任务
    29. */
    30. public void execute(TimerTask task)
    31. {
    32. executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    33. }
    34. /**
    35. * 停止任务线程池
    36. */
    37. public void shutdown()
    38. {
    39. Threads.shutdownAndAwaitTermination(executor);
    40. }
    41. }

    2、异步工厂

    1. /**
    2. * 异步工厂(产生任务用)
    3. *
    4. * @author ruoyi
    5. */
    6. public class AsyncFactory
    7. {
    8. private static final Logger sys_user_logger = LoggerFactory.getLogger("sys-user");
    9. /**
    10. * 记录登录信息
    11. *
    12. * @param username 用户名
    13. * @param status 状态
    14. * @param message 消息
    15. * @param args 列表
    16. * @return 任务task
    17. */
    18. public static TimerTask recordLogininfor(final String username, final String status, final String message,
    19. final Object... args)
    20. {
    21. final UserAgent userAgent = UserAgent.parseUserAgentString(ServletUtils.getRequest().getHeader("User-Agent"));
    22. final String ip = IpUtils.getIpAddr();
    23. return new TimerTask()
    24. {
    25. @Override
    26. public void run()
    27. {
    28. String address = AddressUtils.getRealAddressByIP(ip);
    29. StringBuilder s = new StringBuilder();
    30. s.append(LogUtils.getBlock(ip));
    31. s.append(address);
    32. s.append(LogUtils.getBlock(username));
    33. s.append(LogUtils.getBlock(status));
    34. s.append(LogUtils.getBlock(message));
    35. // 打印信息到日志
    36. sys_user_logger.info(s.toString(), args);
    37. // 获取客户端操作系统
    38. String os = userAgent.getOperatingSystem().getName();
    39. // 获取客户端浏览器
    40. String browser = userAgent.getBrowser().getName();
    41. // 封装对象
    42. SysLogininfor logininfor = new SysLogininfor();
    43. logininfor.setUserName(username);
    44. logininfor.setIpaddr(ip);
    45. logininfor.setLoginLocation(address);
    46. logininfor.setBrowser(browser);
    47. logininfor.setOs(os);
    48. logininfor.setMsg(message);
    49. // 日志状态
    50. if (StringUtils.equalsAny(status, Constants.LOGIN_SUCCESS, Constants.LOGOUT, Constants.REGISTER))
    51. {
    52. logininfor.setStatus(Constants.SUCCESS);
    53. }
    54. else if (Constants.LOGIN_FAIL.equals(status))
    55. {
    56. logininfor.setStatus(Constants.FAIL);
    57. }
    58. // 插入数据
    59. SpringUtils.getBean(ISysLogininforService.class).insertLogininfor(logininfor);
    60. }
    61. };
    62. }
    63. /**
    64. * 操作日志记录
    65. *
    66. * @param operLog 操作日志信息
    67. * @return 任务task
    68. */
    69. public static TimerTask recordOper(final SysOperLog operLog)
    70. {
    71. return new TimerTask()
    72. {
    73. @Override
    74. public void run()
    75. {
    76. // 远程查询操作地点
    77. operLog.setOperLocation(AddressUtils.getRealAddressByIP(operLog.getOperIp()));
    78. SpringUtils.getBean(ISysOperLogService.class).insertOperlog(operLog);
    79. }
    80. };
    81. }
    82. }

    3、线程关闭

    1. @Component
    2. public class ShutdownManager
    3. {
    4. private static final Logger logger = LoggerFactory.getLogger("sys-user");
    5. @PreDestroy
    6. public void destroy()
    7. {
    8. shutdownAsyncManager();
    9. }
    10. /**
    11. * 停止异步执行任务
    12. */
    13. private void shutdownAsyncManager()
    14. {
    15. try
    16. {
    17. logger.info("====关闭后台任务任务线程池====");
    18. AsyncManager.me().shutdown();
    19. }
    20. catch (Exception e)
    21. {
    22. logger.error(e.getMessage(), e);
    23. }
    24. }
    25. }

    改造代码

    1、改造异步任务管理器

    这个管理器里面没有业务代码,我们稍微修改一下。

    1. public class GlobalAsyncManager {
    2. //单例
    3. private static final GlobalAsyncManager instance = new GlobalAsyncManager();
    4. //延迟执行时间
    5. private final int OPERATOR_DELAY_TIME = 10;
    6. private ScheduledExecutorService executorService = SpringUtils.getBean("scheduledExecutorService");
    7. private GlobalAsyncManager(){
    8. }
    9. public static GlobalAsyncManager getInstance(){
    10. return instance;
    11. }
    12. //执行任务
    13. public void executorTask(TimerTask task){
    14. executorService.schedule(task,OPERATOR_DELAY_TIME, TimeUnit.MILLISECONDS);
    15. }
    16. //停止任务线程池
    17. public void shutdown(){
    18. Threads.shutdownAndAwaitTermination(executorService);
    19. }
    20. }

    2、自定义异步工厂

    1. /**
    2. * 异步工厂,产生任务用
    3. */
    4. @Slf4j
    5. public class AsyncTaskFactory {
    6. public static TimerTask recordLogToData(参数){
    7. return new TimerTask() {
    8. @Override
    9. public void run() {
    10. //日志操作
    11. }
    12. };
    13. }
    14. }

    使用

    1、若依代码使用

    1. /**
    2. * 登录验证
    3. *
    4. * @param username 用户名
    5. * @param password 密码
    6. * @param code 验证码
    7. * @param uuid 唯一标识
    8. * @return 结果
    9. */
    10. public String login(String username, String password, String code, String uuid)
    11. {
    12. // 验证码校验
    13. validateCaptcha(username, code, uuid);
    14. // 登录前置校验
    15. loginPreCheck(username, password);
    16. // 用户验证
    17. Authentication authentication = null;
    18. try
    19. {
    20. UsernamePasswordAuthenticationToken authenticationToken = new UsernamePasswordAuthenticationToken(username, password);
    21. AuthenticationContextHolder.setContext(authenticationToken);
    22. // 该方法会去调用UserDetailsServiceImpl.loadUserByUsername
    23. authentication = authenticationManager.authenticate(authenticationToken);
    24. }
    25. catch (Exception e)
    26. {
    27. if (e instanceof BadCredentialsException)
    28. {
    29. AsyncManager.me().execute(AsyncFactory.recordLogininfor(username, Constants.LOGIN_FAIL, MessageUtils.message("user.password.not.match")));
    30. throw new UserPasswordNotMatchException();
    31. }
    32. else
    33. {
    34. AsyncManager.me().execute(AsyncFactory.recordLogininfor(username, Constants.LOGIN_FAIL, e.getMessage()));
    35. throw new ServiceException(e.getMessage());
    36. }
    37. }
    38. finally
    39. {
    40. AuthenticationContextHolder.clearContext();
    41. }
    42. AsyncManager.me().execute(AsyncFactory.recordLogininfor(username, Constants.LOGIN_SUCCESS, MessageUtils.message("user.login.success")));
    43. LoginUser loginUser = (LoginUser) authentication.getPrincipal();
    44. recordLoginInfo(loginUser.getUserId());
    45. // 生成token
    46. return tokenService.createToken(loginUser);
    47. }

    2、自定义代码使用

    1. @Test
    2. public void recordLog() throws InterruptedException {
    3. //为了验证其是异步效果,这里模拟线程休眠并记录时间
    4. Thread.sleep(1000);
    5. GlobalAsyncManager.getInstance().executorTask(AsyncTaskFactory.recordLogToData(LocalDateTime.now());
    6. Thread.sleep(3000);
    7. System.out.println(LocalDateTime.now());
    8. }
  • 相关阅读:
    计算机毕设(附源码)JAVA-SSM基于课程群的实验管理平台
    大数据怎么学?对大数据开发领域及岗位的详细解读,完整理解大数据开发领域技术体系
    腾讯云Ubuntu18.04配置深度学习环境
    433个量子比特!IBM推出最新量子计算机Osprey
    python:pyqt5案例(简易浏览器)
    Spring与Web环境集成
    测试工程师提升:测试开发VS性能测试?谁能干出......
    小成本搏大流量:微信/支付宝小程序搜索排名优化
    ElasticSearch使用入门及拼音搜索介绍
    hyperf框架接入pgsql扩展包
  • 原文地址:https://blog.csdn.net/cyuyanya__/article/details/139580035