• SpringBoot中使用ThreadPoolExecutor和ThreadPoolTaskExecutor线程池的方法和区别


            Java中经常用到多线程来处理业务。在多线程的使用中,非常的不建议使用单纯的Thread或者实现Runnable接口的方式来创建线程,因为这样的线程创建及销毁势必会造成耗费资源、线程上下文切换问题,同时创建过多的线程也可能会引发资源耗尽的风险,对线程的管理非常的不方便。因此在使用多线程的时候,日常开发中我们经常引入的是线程池,利用线程池十分方便的对线程任务进行管理。

            这里主要对线程池ThreadPoolExecutor和ThreadPoolTaskExecutor进行对比与使用见解。

    一、ThreadPoolExecutor

    该图是它的继承关系

     它的构造方法为

    1. public ThreadPoolExecutor(int coreSize,
    2. int maxSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExectionHandler handler);

    几个参数的含义分别是:

    coreSize:核心线程数,也是线程池中常驻的线程数
    maxSize:最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于核心线程数的线程
    keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉
    unit:keepAliveTime的时间单位
    workQueue:用于保存任务的队列,可以为直接提交队列、无界任务队列、有界任务队列、优先任务队列类型之一,当池子里的工作线程数大于核心线程数时,这时新进来的任务会被放到队列中
    threadFactory:执行程序创建新线程时使用的工厂
    handler:线程池无法继续接收任务是的拒绝策略

    workQueue任务队列

            workQueue任务队列可以为直接提交队列、无界任务队列、有界任务队列、优先任务队列类型之一,示例如下

    例1:直接提交队列

    SynchronousQueue它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作

            当创建的线程数大于最大线程数时,会直接执行设置好的拒绝策略

    1. new ThreadPoolExecutor(1,
    2. 2,
    3. 1000,
    4. TimeUnit.MILLISECONDS,
    5. new SynchronousQueue(),
    6. Executors.defaultThreadFactory(),
    7. new ThreadPoolExecutor.AbortPolicy()
    8. );

    例2:有界的任务队列

    ArrayBlockingQueue有界的任务队列。如果有新的任务需要执行时,线程池会创建新的线程,知道创建的线程数量达到核心线程数时,则会将新的任务加入到等待的队列中。如果等待的队列已满,则会继续创建线程,直到线程数量达到设定的最大线程数,如果创建的线程数大于了最大线程数,则执行拒绝策略。

    1. new ThreadPoolExecutor(
    2. 1,
    3. 2,
    4. 1000,
    5. TimeUnit.MILLISECONDS,
    6. new ArrayBlockingQueue(10),
    7. Executors.defaultThreadFactory(),
    8. new ThreadPoolExecutor.AbortPolicy()
    9. );

     例3:无界的任务队列

    LinkedBlockingQueue无界的任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数就是设定的核心线程数量,也就是说在这种情况下,就算你设置了最大线程数也是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

    1. new ThreadPoolExecutor(
    2. 1,
    3. 2,
    4. 1000,
    5. TimeUnit.MILLISECONDS,
    6. new LinkedBlockingQueue(),
    7. Executors.defaultThreadFactory(),
    8. new ThreadPoolExecutor.AbortPolicy()
    9. );

     例4:优先任务队列

            PriorityBlockingQueue优先任务队列,线程池的线程数一直为设定的核心线程数个,无论添加多少个任务,线程池创建的线程数也不会超过你设定的核心线程数,只不过PriorityBlockingQueue队列内的任务可以自定义队则根据任务的优先级顺序进行执行,不同于其它队列是按照先进先出的规则处理的

    1. new ThreadPoolExecutor(1,
    2. 2,
    3. 1000,
    4. TimeUnit.MILLISECONDS,
    5. new PriorityBlockingQueue(),
    6. Executors.defaultThreadFactory(),
    7. new ThreadPoolExecutor.AbortPolicy()
    8. );

    线程池拒绝策略

    AbortPolicy

           直接抛出异常阻止系统正常工作

    CallerRunsPolicy

           只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

    DiscardOldestPolicy

           丢弃最老的一个请求,尝试再次提交当前任务

    DiscardPolicy

           丢弃无法处理的任务,不给予任何处理

    除上述拒绝策略外,可以实现RejectedExecutionHandler接口,自定义拒绝策略

    1. new ThreadPoolExecutor(
    2. 1,
    3. 2,
    4. 1000,
    5. TimeUnit.MILLISECONDS,
    6. new ArrayBlockingQueue(5),
    7. Executors.defaultThreadFactory(),
    8. new RejectedExecutionHandler() {
    9. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
    10. {
    11. System.out.println(r.toString() + "执行了拒绝策略");
    12. }
    13. });

    ThreadPoolExecutor工作流程

            当一个新的任务提交给线程池时,线程池的处理步奏:

    1、首先判断核心线程数是否已满,如果没满则调用一个线程处理Task任务,如果已满则执行步奏2;

    2、这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步奏3;

    3、判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤4;

    4、这时会使用淘汰策略来处理无法执行的Task任务

    ThreadpoolExecutor线程池的使用

    书写一个配置类,在配置类中定义一个bean,如下

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.scheduling.annotation.EnableAsync;
    5. import java.util.concurrent.ArrayBlockingQueue;
    6. import java.util.concurrent.ThreadFactory;
    7. import java.util.concurrent.ThreadPoolExecutor;
    8. import java.util.concurrent.TimeUnit;
    9. @Configuration
    10. @Slf4j
    11. @EnableAsync
    12. public class ExecutorConfig {
    13. @Bean
    14. public ThreadPoolExecutor asyncExecutor(){
    15. ThreadPoolExecutor executor = new ThreadPoolExecutor(
    16. 10,
    17. 20,
    18. 1000,
    19. TimeUnit.MILLISECONDS,
    20. new ArrayBlockingQueue(5),
    21. new ThreadFactory() {
    22. @Override
    23. public Thread newThread(Runnable r) {
    24. System.out.println("线程"+r.hashCode()+"创建");
    25. //线程命名
    26. Thread th = new Thread(r,"threadPool"+r.hashCode());
    27. return th;
    28. }
    29. },
    30. new ThreadPoolExecutor.CallerRunsPolicy()
    31. ){
    32. @Override
    33. protected void beforeExecute(Thread t, Runnable r) {
    34. System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
    35. super.beforeExecute(t, r);
    36. }
    37. @Override
    38. protected void afterExecute(Runnable r, Throwable t) {
    39. System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
    40. super.afterExecute(r, t);
    41. }
    42. @Override
    43. protected void terminated() {
    44. System.out.println("线程池退出");
    45. super.terminated();
    46. }
    47. };
    48. return executor;
    49. }
    50. }

    说明:

    1. beforeExecute:线程池中任务运行前执行
    2. afterExecute:线程池中任务运行完毕后执行
    3. terminated:线程池退出后执行

    代码中的ThreadTask如下,此处可根据自己需求进行代码编写

    1. public class ThreadTask implements Runnable {
    2. private String taskName;
    3. public String getTaskName() {
    4. return taskName;
    5. }
    6. public void setTaskName(String taskName) {
    7. this.taskName = taskName;
    8. }
    9. public ThreadTask(String name) {
    10. this.setTaskName(name);
    11. }
    12. public void run() {
    13. //输出执行线程的名称
    14. System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    15. }
    16. }

    二、ThreadPoolTaskExecutor

    ThreadPoolTaskExecutor这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装的一层,使得Spring的整合更加方便

    继承关系如下

     其成员变量如ThreadPoolExecutor,有核心线程数、最大线程数、keepAliveTIme、超时时间单位、队列、线程创建工厂、拒绝策略

    查看它的源码如下

     

     可以看出,它依赖的还是ThreadPoolExecutor,并且注意它直接设定了keepAliveTime的时间单位

    它的队列、拒绝策略通ThreadPoolExecutor一致

    ThreadPoolTaskExecutor的使用

    书写一个配置类,在配置类中对线程池ThreadPoolTaskExecutor进行配置

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.scheduling.annotation.EnableAsync;
    5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    6. import java.util.concurrent.ThreadPoolExecutor;
    7. @Configuration
    8. @Slf4j
    9. @EnableAsync
    10. public class ExecutorConfig {
    11. @Bean
    12. public ThreadPoolTaskExecutor asyncExecutor() {
    13. log.info("start asyncServiceExecutor");
    14. ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    15. //配置核心线程数
    16. executor.setCorePoolSize(10);
    17. //配置最大线程数
    18. executor.setMaxPoolSize(20);
    19. //配置队列大小
    20. executor.setQueueCapacity(100);
    21. //配置keepAliveTime
    22. executor.setKeepAliveSeconds(10);
    23. //配置线程池中的线程的名称前缀
    24. executor.setThreadNamePrefix("async-service-");
    25. //拒绝策略
    26. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    27. //执行初始化
    28. executor.initialize();
    29. return executor;
    30. }
    31. }

    配置类中的VisiableThreadPoolTaskExecutor()类扩展了ThreadPoolTaskExecutor,对线程执行前后各阶段做了补充操作,类似于上面ThreadPoolExecutor中的beforeExecute、afterExecute等操作,具体代码如下

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    3. import org.springframework.util.concurrent.ListenableFuture;
    4. import java.util.concurrent.Callable;
    5. import java.util.concurrent.Future;
    6. import java.util.concurrent.ThreadPoolExecutor;
    7. @Slf4j
    8. public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    9. private void showThreadPoolInfo(String prefix) {
    10. ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
    11. if (null == threadPoolExecutor) {
    12. return;
    13. }
    14. log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
    15. this.getThreadNamePrefix(),
    16. prefix,
    17. threadPoolExecutor.getTaskCount(),
    18. threadPoolExecutor.getCompletedTaskCount(),
    19. threadPoolExecutor.getActiveCount(),
    20. threadPoolExecutor.getQueue().size());
    21. }
    22. @Override
    23. public void execute(Runnable task) {
    24. showThreadPoolInfo("1. do execute");
    25. super.execute(task);
    26. }
    27. @Override
    28. public void execute(Runnable task, long startTimeout) {
    29. showThreadPoolInfo("2. do execute");
    30. super.execute(task, startTimeout);
    31. }
    32. @Override
    33. public Future submit(Runnable task) {
    34. showThreadPoolInfo("1. do submit");
    35. return super.submit(task);
    36. }
    37. @Override
    38. public Future submit(Callable task) {
    39. showThreadPoolInfo("2. do submit");
    40. return super.submit(task);
    41. }
    42. @Override
    43. public ListenableFuture submitListenable(Runnable task) {
    44. showThreadPoolInfo("1. do submitListenable");
    45. return super.submitListenable(task);
    46. }
    47. @Override
    48. public ListenableFuture submitListenable(Callable task) {
    49. showThreadPoolInfo("2. do submitListenable");
    50. return super.submitListenable(task);
    51. }
    52. }

    三、线程池在接口中的具体使用

     上述描述中,最终书写了一个配置类,对线程池进行了配置,定义了一个bean对象,那么在具体接口中该怎么使用,如下所示

    1、创建controller层,书写接口入口,调用server层代码

    1. import com.smile.syncproject.service.AsyncService;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. @RestController
    7. @RequestMapping("async")
    8. @Slf4j
    9. public class AsyncController {
    10. @Autowired
    11. private AsyncService asyncService;
    12. @RequestMapping("test")
    13. public String test() {
    14. log.info("start submit");
    15. //调用service层的任务
    16. asyncService.executeAsync();
    17. log.info("end submit");
    18. return "success";
    19. }
    20. }

     2、在service层实现层进行线程池的使用

            通过注解@Async

    @Async("asyncServiceExecutor")

    注解内的值就是上面定义好的配置类中的bean的名称。如果有多个线程池,就需要在定义不同bean的时候指定其name了

    1. import com.smile.syncproject.service.AsyncService;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.scheduling.annotation.Async;
    4. import org.springframework.stereotype.Service;
    5. @Service
    6. @Slf4j
    7. public class AsyncServiceImpl implements AsyncService {
    8. @Override
    9. @Async("asyncServiceExecutor")
    10. public void executeAsync() {
    11. log.info("start executeAsync");
    12. try{
    13. Thread.sleep(1000);
    14. }catch(Exception e){
    15. e.printStackTrace();
    16. }
    17. log.info("end executeAsync");
    18. }
    19. }

    四、其它

    1、线程池大小的设置

            针对这个问题,我们首先要确认的是我们的需求是计算密集型还是IO密集型。

            如果是计算密集型,比较理想的方案是:线程数 = CPU核数 + 1,也可以设置成CPU核数*2,一般设置CPU*2

            如果是IO密集型,线程数 = CPU核心数/(1-阻塞系数),这个组赛系数一般为0.8~0.9之间,也可以取0.8或者0.9.

  • 相关阅读:
    有一种浪漫,叫接触Linux
    前端基础知识点
    vue3基于vite打包
    python将word文件转换成pdf文件
    从零搭建云原生技术kubernetes(K8S)环境-通过kubesPhere的AllInOne方式
    初试Shiro
    16-k8s-configMap配置管理中心
    【php快速入门】学习笔记
    Spring Boot集成Redis集群报错UnsupportedOperationException
    算法提高: 使用归并实现范围和问题(Count of Range Sum)
  • 原文地址:https://blog.csdn.net/qq_40386113/article/details/127581333