Java中经常用到多线程来处理业务。在多线程的使用中,非常的不建议使用单纯的Thread或者实现Runnable接口的方式来创建线程,因为这样的线程创建及销毁势必会造成耗费资源、线程上下文切换问题,同时创建过多的线程也可能会引发资源耗尽的风险,对线程的管理非常的不方便。因此在使用多线程的时候,日常开发中我们经常引入的是线程池,利用线程池十分方便的对线程任务进行管理。
这里主要对线程池ThreadPoolExecutor和ThreadPoolTaskExecutor进行对比与使用见解。
该图是它的继承关系
它的构造方法为
- public ThreadPoolExecutor(int coreSize,
- int maxSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue
workQueue, - ThreadFactory threadFactory,
- RejectedExectionHandler handler);
几个参数的含义分别是:
coreSize:核心线程数,也是线程池中常驻的线程数
maxSize:最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于核心线程数的线程
keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉
unit:keepAliveTime的时间单位
workQueue:用于保存任务的队列,可以为直接提交队列、无界任务队列、有界任务队列、优先任务队列类型之一,当池子里的工作线程数大于核心线程数
时,这时新进来的任务会被放到队列中
threadFactory:执行程序创建新线程时使用的工厂
handler:线程池无法继续接收任务是的拒绝策略
workQueue任务队列可以为直接提交队列、无界任务队列、有界任务队列、优先任务队列类型之一,示例如下
例1:直接提交队列
SynchronousQueue它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作
当创建的线程数大于最大线程数时,会直接执行设置好的拒绝策略
- new ThreadPoolExecutor(1,
- 2,
- 1000,
- TimeUnit.MILLISECONDS,
- new SynchronousQueue
(), - Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
例2:有界的任务队列
ArrayBlockingQueue有界的任务队列。如果有新的任务需要执行时,线程池会创建新的线程,知道创建的线程数量达到核心线程数时,则会将新的任务加入到等待的队列中。如果等待的队列已满,则会继续创建线程,直到线程数量达到设定的最大线程数,如果创建的线程数大于了最大线程数,则执行拒绝策略。
- new ThreadPoolExecutor(
- 1,
- 2,
- 1000,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue
(10), - Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
例3:无界的任务队列
LinkedBlockingQueue无界的任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数就是设定的核心线程数量,也就是说在这种情况下,就算你设置了最大线程数也是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize
后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
- new ThreadPoolExecutor(
- 1,
- 2,
- 1000,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
(), - Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
例4:优先任务队列
PriorityBlockingQueue优先任务队列,线程池的线程数一直为设定的核心线程数个,无论添加多少个任务,线程池创建的线程数也不会超过你设定的核心线程数,只不过PriorityBlockingQueue队列内的任务可以自定义队则根据任务的优先级顺序进行执行,不同于其它队列是按照先进先出的规则处理的
- new ThreadPoolExecutor(1,
- 2,
- 1000,
- TimeUnit.MILLISECONDS,
- new PriorityBlockingQueue
(), - Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
AbortPolicy
直接抛出异常阻止系统正常工作
CallerRunsPolicy
只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
DiscardOldestPolicy
丢弃最老的一个请求,尝试再次提交当前任务
DiscardPolicy
丢弃无法处理的任务,不给予任何处理
除上述拒绝策略外,可以实现RejectedExecutionHandler接口,自定义拒绝策略
- new ThreadPoolExecutor(
- 1,
- 2,
- 1000,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue
(5), - Executors.defaultThreadFactory(),
- new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
- {
- System.out.println(r.toString() + "执行了拒绝策略");
- }
- });
当一个新的任务提交给线程池时,线程池的处理步奏:
1、首先判断核心线程数是否已满,如果没满则调用一个线程处理Task任务,如果已满则执行步奏2;
2、这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步奏3;
3、判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤4;
4、这时会使用淘汰策略来处理无法执行的Task任务
书写一个配置类,在配置类中定义一个bean,如下
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.EnableAsync;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
-
- @Configuration
- @Slf4j
- @EnableAsync
- public class ExecutorConfig {
-
- @Bean
- public ThreadPoolExecutor asyncExecutor(){
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 10,
- 20,
- 1000,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue
(5), - new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- System.out.println("线程"+r.hashCode()+"创建");
- //线程命名
- Thread th = new Thread(r,"threadPool"+r.hashCode());
- return th;
- }
- },
- new ThreadPoolExecutor.CallerRunsPolicy()
- ){
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
- super.beforeExecute(t, r);
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
- super.afterExecute(r, t);
- }
-
- @Override
- protected void terminated() {
- System.out.println("线程池退出");
- super.terminated();
- }
- };
- return executor;
- }
-
- }
说明:
beforeExecute
:线程池中任务运行前执行afterExecute
:线程池中任务运行完毕后执行terminated
:线程池退出后执行代码中的ThreadTask如下,此处可根据自己需求进行代码编写
- public class ThreadTask implements Runnable {
-
- private String taskName;
-
- public String getTaskName() {
- return taskName;
- }
-
- public void setTaskName(String taskName) {
- this.taskName = taskName;
- }
-
- public ThreadTask(String name) {
- this.setTaskName(name);
- }
-
- public void run() {
- //输出执行线程的名称
- System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
- }
- }
ThreadPoolTaskExecutor这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装的一层,使得Spring的整合更加方便
继承关系如下
其成员变量如ThreadPoolExecutor,有核心线程数、最大线程数、keepAliveTIme、超时时间单位、队列、线程创建工厂、拒绝策略
查看它的源码如下
可以看出,它依赖的还是ThreadPoolExecutor,并且注意它直接设定了keepAliveTime的时间单位
它的队列、拒绝策略通ThreadPoolExecutor一致
书写一个配置类,在配置类中对线程池ThreadPoolTaskExecutor进行配置
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import java.util.concurrent.ThreadPoolExecutor;
-
-
- @Configuration
- @Slf4j
- @EnableAsync
- public class ExecutorConfig {
-
- @Bean
- public ThreadPoolTaskExecutor asyncExecutor() {
- log.info("start asyncServiceExecutor");
- ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
- //配置核心线程数
- executor.setCorePoolSize(10);
- //配置最大线程数
- executor.setMaxPoolSize(20);
- //配置队列大小
- executor.setQueueCapacity(100);
- //配置keepAliveTime
- executor.setKeepAliveSeconds(10);
- //配置线程池中的线程的名称前缀
- executor.setThreadNamePrefix("async-service-");
- //拒绝策略
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- //执行初始化
- executor.initialize();
- return executor;
- }
-
- }
配置类中的VisiableThreadPoolTaskExecutor()类扩展了ThreadPoolTaskExecutor,对线程执行前后各阶段做了补充操作,类似于上面ThreadPoolExecutor中的beforeExecute、afterExecute等操作,具体代码如下
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.util.concurrent.ListenableFuture;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Future;
- import java.util.concurrent.ThreadPoolExecutor;
-
- @Slf4j
- public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
-
- private void showThreadPoolInfo(String prefix) {
- ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
-
- if (null == threadPoolExecutor) {
- return;
- }
-
- log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
- this.getThreadNamePrefix(),
- prefix,
- threadPoolExecutor.getTaskCount(),
- threadPoolExecutor.getCompletedTaskCount(),
- threadPoolExecutor.getActiveCount(),
- threadPoolExecutor.getQueue().size());
- }
-
- @Override
- public void execute(Runnable task) {
- showThreadPoolInfo("1. do execute");
- super.execute(task);
- }
-
- @Override
- public void execute(Runnable task, long startTimeout) {
- showThreadPoolInfo("2. do execute");
- super.execute(task, startTimeout);
- }
-
- @Override
- public Future> submit(Runnable task) {
- showThreadPoolInfo("1. do submit");
- return super.submit(task);
- }
-
- @Override
- public
Future submit(Callable task) { - showThreadPoolInfo("2. do submit");
- return super.submit(task);
- }
-
- @Override
- public ListenableFuture> submitListenable(Runnable task) {
- showThreadPoolInfo("1. do submitListenable");
- return super.submitListenable(task);
- }
-
- @Override
- public
ListenableFuture submitListenable(Callable task) { - showThreadPoolInfo("2. do submitListenable");
- return super.submitListenable(task);
- }
- }
-
上述描述中,最终书写了一个配置类,对线程池进行了配置,定义了一个bean对象,那么在具体接口中该怎么使用,如下所示
1、创建controller层,书写接口入口,调用server层代码
- import com.smile.syncproject.service.AsyncService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("async")
- @Slf4j
- public class AsyncController {
-
- @Autowired
- private AsyncService asyncService;
-
- @RequestMapping("test")
- public String test() {
- log.info("start submit");
-
- //调用service层的任务
- asyncService.executeAsync();
-
- log.info("end submit");
-
- return "success";
- }
-
- }
2、在service层实现层进行线程池的使用
通过注解@Async
@Async("asyncServiceExecutor")
注解内的值就是上面定义好的配置类中的bean的名称。如果有多个线程池,就需要在定义不同bean的时候指定其name了
- import com.smile.syncproject.service.AsyncService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
-
- @Service
- @Slf4j
- public class AsyncServiceImpl implements AsyncService {
-
- @Override
- @Async("asyncServiceExecutor")
- public void executeAsync() {
- log.info("start executeAsync");
- try{
- Thread.sleep(1000);
- }catch(Exception e){
- e.printStackTrace();
- }
- log.info("end executeAsync");
- }
-
- }
1、线程池大小的设置
针对这个问题,我们首先要确认的是我们的需求是计算密集型还是IO密集型。
如果是计算密集型,比较理想的方案是:线程数 = CPU核数 + 1,也可以设置成CPU核数*2,一般设置CPU*2
如果是IO密集型,线程数 = CPU核心数/(1-阻塞系数),这个组赛系数一般为0.8~0.9之间,也可以取0.8或者0.9.