- package zhangphil.demo;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.ConfigurableApplicationContext;
- import org.springframework.scheduling.annotation.EnableAsync;
-
- @SpringBootApplication
- @EnableAsync
- public class TestApplication {
- public static void main(String[] args) {
- ConfigurableApplicationContext context=SpringApplication.run(TestApplication.class, args);
- }
- }
主Application,需要配置注解 @EnableAsync 启动异步线程任务。
- package zhangphil.demo;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class MyTask {
- private int a = 0;
- private int b = 0;
- private int c = 0;
-
- @Async
- public void task1() {
- log.info("t-a ...");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- log.info("t-a-{}", a++);
- }
-
- @Async
- public void task2() {
- log.info("t-b ...");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- log.info("t-b-{}", b++);
- }
-
- @Async
- public void task3() {
- log.info("t-c ...");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- log.info("t-c-{}", c++);
- }
- }
上面就是异步线程任务的主要模块。
下面做一个Controller,没别的功能,只为在浏览器输入localhost:8080/start启动task1(),task2(),task()3:
- package zhangphil.demo;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @Slf4j
- public class MyController {
- @Autowired
- private MyTask myTask;
-
- @RequestMapping(value = "/start", method = RequestMethod.GET)
- private String start() {
- myTask.task1();
- myTask.task2();
- myTask.task3();
- log.info("start ok");
- return "OK";
- }
- }
启动后,在浏览器访问localhost:8080/start后,后台输出:
spring默认的为多线程任务装载了线程名为 task-1,task-2,task-3 启动它们。以上代码我们并没有主动创建线程池,而是spring默默的用框架包装好的ThreadPoolTaskExecutor为我们的代码启动了多线程。
以上是spring原生的多线程异步任务框架,我们没有添加任何配置参数,只是简单的调用。如果要自定义更细颗粒度的使用和理解线程池任务,可以通过配置@Configuration配置AsyncConfigurer
实现对spring框架默认的ThreadPoolTaskExecutor二次定制化配置,配置代码:
- package zhangphil.demo;
-
- import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.AsyncConfigurer;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
- import java.util.concurrent.Executor;
- import java.util.concurrent.ThreadPoolExecutor;
-
- @Configuration
- public class MyAsyncConfigurer implements AsyncConfigurer {
-
- @Override
- public Executor getAsyncExecutor() {
- return executor();
- }
-
- @Bean("myTaskExecutor")
- public ThreadPoolTaskExecutor executor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-
- int corePoolSize = 10;
- executor.setCorePoolSize(corePoolSize);
-
- int maxPoolSize = 50;
- executor.setMaxPoolSize(maxPoolSize);
-
- int queueCapacity = 10;
- executor.setQueueCapacity(queueCapacity);
-
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-
- String threadNamePrefix = "my-task@";
- executor.setThreadNamePrefix(threadNamePrefix);
-
- //executor.setAwaitTerminationMillis(100*1000);
- //executor.setWaitForTasksToCompleteOnShutdown(true);
- //executor.setAllowCoreThreadTimeOut(true);
-
- executor.setKeepAliveSeconds(60 * 1000);
-
- executor.initialize();
-
- return executor;
- }
-
- @Override
- public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
- return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
- }
- }
MyTask.java不做任何修改,运行后输出:
可以看到,此时的线程任务名称已经变为我们自定义的my-task@ 开头。
有些项目中的线程池可能不止一个,而是三个甚至更多,那就不太适合通过(二)中的技术路线实现了,而需要更一般、更通用的方法实现。
- package zhangphil.demo;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
- import java.util.concurrent.ThreadPoolExecutor;
-
- @Configuration
- public class MyTaskExecutor {
-
- @Bean("executorA")
- public ThreadPoolTaskExecutor executorA() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-
- int corePoolSize = 10;
- executor.setCorePoolSize(corePoolSize);
-
- int maxPoolSize = 50;
- executor.setMaxPoolSize(maxPoolSize);
-
- int queueCapacity = 10;
- executor.setQueueCapacity(queueCapacity);
-
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-
- String threadNamePrefix = "task@A-";
- executor.setThreadNamePrefix(threadNamePrefix);
-
- //executor.setAwaitTerminationMillis(100*1000);
- //executor.setWaitForTasksToCompleteOnShutdown(true);
- //executor.setAllowCoreThreadTimeOut(true);
-
- executor.setKeepAliveSeconds(60 * 1000);
-
- executor.initialize();
-
- return executor;
- }
-
- @Bean("executorB")
- public ThreadPoolTaskExecutor executorB() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-
- int corePoolSize = 10;
- executor.setCorePoolSize(corePoolSize);
-
- int maxPoolSize = 50;
- executor.setMaxPoolSize(maxPoolSize);
-
- int queueCapacity = 10;
- executor.setQueueCapacity(queueCapacity);
-
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-
- String threadNamePrefix = "task@B-";
- executor.setThreadNamePrefix(threadNamePrefix);
-
- //executor.setAwaitTerminationMillis(100*1000);
- //executor.setWaitForTasksToCompleteOnShutdown(true);
- //executor.setAllowCoreThreadTimeOut(true);
-
- executor.setKeepAliveSeconds(60 * 1000);
-
- executor.initialize();
-
- return executor;
- }
- }
-
上面代码通过@Configuration定义了不同于spring框架默认的那个多线程池子,这两个单独自定义的线程池名分别以task@A-和task@B-开头。多线程池实现了,对应的使用这两个线程池的任务代码模块也需要调整:
- package zhangphil.demo;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class MyTask {
- private int a = 0;
- private int b = 0;
- private int c = 0;
-
- @Async("executorA")
- public void task1() {
- log.info("t-a ...");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- log.info("t-a-{}", a++);
- }
-
- @Async
- public void task2() {
- log.info("t-b ...");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- log.info("t-b-{}", b++);
- }
-
- @Async("executorB")
- public void task3() {
- log.info("t-c ...");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- log.info("t-c-{}", c++);
- }
- }
注意task2(),特意没有给@Async加上寻址名字。看看后台运行输出:
从输出可以看到,由于在task1()和task3()指定了@Async()需要启用的线程池executorA和executorB,所以task1()和task3()的多线程任务分别被executorA和executorB调度装载运行。我们故意没有给task2()的@Async指定线程池,但由于在Application里面通过配置注解@EnableAsync
开启了多线程任务特性,所以,spring框架为task2()也启用了默认的未经配置的线程池。