• spring异步线程任务Async,自定义配置线程池,Java


    spring异步线程任务Async,自定义配置线程池,Java

    (一)spring里面,可以通过简洁的@Async使用默认的线程池跑多线程任务。

    1. package zhangphil.demo;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. import org.springframework.context.ConfigurableApplicationContext;
    5. import org.springframework.scheduling.annotation.EnableAsync;
    6. @SpringBootApplication
    7. @EnableAsync
    8. public class TestApplication {
    9. public static void main(String[] args) {
    10. ConfigurableApplicationContext context=SpringApplication.run(TestApplication.class, args);
    11. }
    12. }

    主Application,需要配置注解 @EnableAsync 启动异步线程任务。

    1. package zhangphil.demo;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.scheduling.annotation.Async;
    4. import org.springframework.stereotype.Component;
    5. @Slf4j
    6. @Component
    7. public class MyTask {
    8. private int a = 0;
    9. private int b = 0;
    10. private int c = 0;
    11. @Async
    12. public void task1() {
    13. log.info("t-a ...");
    14. try {
    15. Thread.sleep(5000);
    16. } catch (InterruptedException e) {
    17. throw new RuntimeException(e);
    18. }
    19. log.info("t-a-{}", a++);
    20. }
    21. @Async
    22. public void task2() {
    23. log.info("t-b ...");
    24. try {
    25. Thread.sleep(5000);
    26. } catch (InterruptedException e) {
    27. throw new RuntimeException(e);
    28. }
    29. log.info("t-b-{}", b++);
    30. }
    31. @Async
    32. public void task3() {
    33. log.info("t-c ...");
    34. try {
    35. Thread.sleep(5000);
    36. } catch (InterruptedException e) {
    37. throw new RuntimeException(e);
    38. }
    39. log.info("t-c-{}", c++);
    40. }
    41. }

    上面就是异步线程任务的主要模块。

    下面做一个Controller,没别的功能,只为在浏览器输入localhost:8080/start启动task1(),task2(),task()3:

    1. package zhangphil.demo;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RequestMethod;
    6. import org.springframework.web.bind.annotation.RestController;
    7. @RestController
    8. @Slf4j
    9. public class MyController {
    10. @Autowired
    11. private MyTask myTask;
    12. @RequestMapping(value = "/start", method = RequestMethod.GET)
    13. private String start() {
    14. myTask.task1();
    15. myTask.task2();
    16. myTask.task3();
    17. log.info("start ok");
    18. return "OK";
    19. }
    20. }

    启动后,在浏览器访问localhost:8080/start后,后台输出:

     spring默认的为多线程任务装载了线程名为 task-1,task-2,task-3 启动它们。以上代码我们并没有主动创建线程池,而是spring默默的用框架包装好的ThreadPoolTaskExecutor为我们的代码启动了多线程。

    (二)自定义配置Async的线程池。

    以上是spring原生的多线程异步任务框架,我们没有添加任何配置参数,只是简单的调用。如果要自定义更细颗粒度的使用和理解线程池任务,可以通过配置@Configuration配置AsyncConfigurer

    实现对spring框架默认的ThreadPoolTaskExecutor二次定制化配置,配置代码:

    1. package zhangphil.demo;
    2. import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import org.springframework.scheduling.annotation.AsyncConfigurer;
    6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    7. import java.util.concurrent.Executor;
    8. import java.util.concurrent.ThreadPoolExecutor;
    9. @Configuration
    10. public class MyAsyncConfigurer implements AsyncConfigurer {
    11. @Override
    12. public Executor getAsyncExecutor() {
    13. return executor();
    14. }
    15. @Bean("myTaskExecutor")
    16. public ThreadPoolTaskExecutor executor() {
    17. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    18. int corePoolSize = 10;
    19. executor.setCorePoolSize(corePoolSize);
    20. int maxPoolSize = 50;
    21. executor.setMaxPoolSize(maxPoolSize);
    22. int queueCapacity = 10;
    23. executor.setQueueCapacity(queueCapacity);
    24. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    25. String threadNamePrefix = "my-task@";
    26. executor.setThreadNamePrefix(threadNamePrefix);
    27. //executor.setAwaitTerminationMillis(100*1000);
    28. //executor.setWaitForTasksToCompleteOnShutdown(true);
    29. //executor.setAllowCoreThreadTimeOut(true);
    30. executor.setKeepAliveSeconds(60 * 1000);
    31. executor.initialize();
    32. return executor;
    33. }
    34. @Override
    35. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    36. return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
    37. }
    38. }

    MyTask.java不做任何修改,运行后输出:

     可以看到,此时的线程任务名称已经变为我们自定义的my-task@ 开头。

    (三)配置多个线程池。

    有些项目中的线程池可能不止一个,而是三个甚至更多,那就不太适合通过(二)中的技术路线实现了,而需要更一般、更通用的方法实现。

    1. package zhangphil.demo;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    5. import java.util.concurrent.ThreadPoolExecutor;
    6. @Configuration
    7. public class MyTaskExecutor {
    8. @Bean("executorA")
    9. public ThreadPoolTaskExecutor executorA() {
    10. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    11. int corePoolSize = 10;
    12. executor.setCorePoolSize(corePoolSize);
    13. int maxPoolSize = 50;
    14. executor.setMaxPoolSize(maxPoolSize);
    15. int queueCapacity = 10;
    16. executor.setQueueCapacity(queueCapacity);
    17. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    18. String threadNamePrefix = "task@A-";
    19. executor.setThreadNamePrefix(threadNamePrefix);
    20. //executor.setAwaitTerminationMillis(100*1000);
    21. //executor.setWaitForTasksToCompleteOnShutdown(true);
    22. //executor.setAllowCoreThreadTimeOut(true);
    23. executor.setKeepAliveSeconds(60 * 1000);
    24. executor.initialize();
    25. return executor;
    26. }
    27. @Bean("executorB")
    28. public ThreadPoolTaskExecutor executorB() {
    29. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    30. int corePoolSize = 10;
    31. executor.setCorePoolSize(corePoolSize);
    32. int maxPoolSize = 50;
    33. executor.setMaxPoolSize(maxPoolSize);
    34. int queueCapacity = 10;
    35. executor.setQueueCapacity(queueCapacity);
    36. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    37. String threadNamePrefix = "task@B-";
    38. executor.setThreadNamePrefix(threadNamePrefix);
    39. //executor.setAwaitTerminationMillis(100*1000);
    40. //executor.setWaitForTasksToCompleteOnShutdown(true);
    41. //executor.setAllowCoreThreadTimeOut(true);
    42. executor.setKeepAliveSeconds(60 * 1000);
    43. executor.initialize();
    44. return executor;
    45. }
    46. }

    上面代码通过@Configuration定义了不同于spring框架默认的那个多线程池子,这两个单独自定义的线程池名分别以task@A-和task@B-开头。多线程池实现了,对应的使用这两个线程池的任务代码模块也需要调整:

    1. package zhangphil.demo;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.scheduling.annotation.Async;
    4. import org.springframework.stereotype.Component;
    5. @Slf4j
    6. @Component
    7. public class MyTask {
    8. private int a = 0;
    9. private int b = 0;
    10. private int c = 0;
    11. @Async("executorA")
    12. public void task1() {
    13. log.info("t-a ...");
    14. try {
    15. Thread.sleep(5000);
    16. } catch (InterruptedException e) {
    17. throw new RuntimeException(e);
    18. }
    19. log.info("t-a-{}", a++);
    20. }
    21. @Async
    22. public void task2() {
    23. log.info("t-b ...");
    24. try {
    25. Thread.sleep(5000);
    26. } catch (InterruptedException e) {
    27. throw new RuntimeException(e);
    28. }
    29. log.info("t-b-{}", b++);
    30. }
    31. @Async("executorB")
    32. public void task3() {
    33. log.info("t-c ...");
    34. try {
    35. Thread.sleep(5000);
    36. } catch (InterruptedException e) {
    37. throw new RuntimeException(e);
    38. }
    39. log.info("t-c-{}", c++);
    40. }
    41. }

    注意task2(),特意没有给@Async加上寻址名字。看看后台运行输出:

     从输出可以看到,由于在task1()和task3()指定了@Async()需要启用的线程池executorA和executorB,所以task1()和task3()的多线程任务分别被executorA和executorB调度装载运行。我们故意没有给task2()的@Async指定线程池,但由于在Application里面通过配置注解@EnableAsync

    开启了多线程任务特性,所以,spring框架为task2()也启用了默认的未经配置的线程池。

  • 相关阅读:
    【剑指offer】1-5题
    Typora收费?搭建VS Code MarkDown写作环境
    Dubbo链路追踪——生成全局ID(traceId)
    vue2原理初探-数据代理和数据劫持
    DolphinScheduler 集群部署
    AST 初探深浅,代码还能这样玩?
    MySQL与ES数据同步的四种方案及实践演示
    循环神经网络(RNN)
    学习笔记丨Shell
    【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(二)
  • 原文地址:https://blog.csdn.net/zhangphil/article/details/127042464