• Java多线程(二) 线程池


    4.线程池

    第一种,请求不是很频繁,而且每次连接后会保持相当一段时间来读数据或者写数据,最后断开,如文件下载,网络流媒体等。

    另一种, 形式是请求频繁,但是连接上以后读/写很少量的数据就断开连接。考虑到服务的并发问题,如果每个请求来到以后服务都为它启动一个线程,那么这对服务的资源可能会造成很大的浪费,特别是第二种情况。

    4.1.Executors类 创建 ExecutorService

    提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

    1、 创建固定数目线程的线程池。

    public static ExecutorService newFiexedThreadPool(int Threads);
    
    • 1

    2、创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

    public static ExecutorService newCachedThreadPool();
    
    • 1

    3、创建一个单线程化的Executor。

    public static ExecutorService newSingleThreadExecutor();
    
    • 1

    4、创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
    
    • 1

    4.2.ExecutorService 常用的两个执行方法区别

    import java.util.concurrent.*;
    
    public class T5ThreadPoolTest {
    
        //
        public static ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            // 执行 不返回选择结果
    //        executorService.execute(new Runnable01());
            // 执行 接收返回的结果
            Future<Integer> submit = executorService.submit(new Callable01());
            Integer integer = submit.get();
            System.out.println("线束当前线程:" + Thread.currentThread().getName() + "\t, 返回结果 :" + integer);
        }
    
        /**
         *  实现 Runnable 接口
         */
        public static class Runnable01 implements Runnable{
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10/2;
                System.out.println("运行结果:" + i);
            }
        }
    
        /**
         *  实现 Callable 接口
         */
        public static class Callable01  implements Callable<Integer> {
            @Override
            public Integer call() throws Exception {
                System.out.println(Thread.currentThread().getName());
                int i = 10/2;
                System.out.println(Thread.currentThread().getName());
                return i;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    4.3.自定义线程池

    import java.util.concurrent.*;
    
    public class T6ThreadPoolTest {
        public static void main(String[] args){
            //创建等待队列
            BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(10);
            // 设置 线程属性  (默认方式) Executors.defaultThreadFactory()
            ThreadFactory threadFactory = new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    System.out.println("线程" + r.hashCode() + "创建");
                    //线程命名
                    Thread th = new Thread(r, "threadPool-" + r.hashCode() + "-<");
                    return th;
                }
            };
    //        // 设置 拒绝策略 new ThreadPoolExecutor.CallerRunsPolicy()
            RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println(r.toString() + "执行了拒绝策略");
                }
            };
            //创建线程池,池中保存的线程数为3,允许的最大线程数为5
            ThreadPoolExecutor pool = new ThreadPoolExecutor(3
                                                             ,5
                                                             ,50
                                                             ,TimeUnit.MILLISECONDS
                                                             ,bqueue
                                                             ,threadFactory
                                                             ,rejectedExecutionHandler);
    
            for(int i=0;i<20;i++) {
                pool.execute(new MyThread());
            }
            //关闭线程池
    //        pool.shutdown();
        }
    }
    
    class MyThread implements Runnable{
        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName() + "正在执行。。。");
            try{
                Thread.sleep(100);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    100

    corePoolSize:核心线程数。总是保存的线程数量 5

    maximumPoolSize:最大线程数。表明线程中最多能够创建的线程数量。 20

    keepAliveTime:空闲的线程保留的时间。超时就释放线程( maximumPoolSize - corePoolSize )

    unit:空闲线程的保留时间单位。

    BlockingQueue workQueue: 工作队列(阻塞队列) 任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。 50

    ​ 可以选择以下几个阻塞队列。

    ​ 1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。

    ​ 2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

    ​ **3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数

    ​ 4、PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    ThreadFactory threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等

    RejectedExecutionHandler handler:拒绝策略 (饱和策略处理器)。默认提供的4中策略上面已经有解释了

    ​ 一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要指定 RejectedExecutionHandler拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:

    ​ 1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;

    ​ 2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;

    ​ 3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;

    ​ 4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;

    在这里插入图片描述

    4.4.比较Executor和new Thread()

    new Thread的弊端如下:

    ​ a. 每次new Thread新建对象性能差。
    ​ b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
    ​ c. 缺乏更多功能,如定时执行、定期执行、线程中断。

    相比new Thread,Java提供的四种线程池的好处在于:
    a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
    b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
    c. 提供定时执行、定期执行、单线程、并发数控制等功能。

    4.5.Springboot 使用线程池

    @Configuration
    @EnableAsync
    public class ThreadPoolConfig {
    
        @Bean
        public  Executor taskExecutor(){
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10); //配置核心线程数
            executor.setMaxPoolSize(20);  //配置最大线程数
            executor.setKeepAliveSeconds(5); // 空闲等待时间
            executor.setQueueCapacity(200);//配置队列大小
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略
            executor.setWaitForTasksToCompleteOnShutdown(true);//调度器shutdown被调用时等待当前被调度的任务完成
            executor.setAwaitTerminationSeconds(60);//等待时长
            executor.initialize();//执行初始化
    
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    setWaitForTasksToCompleteOnShutdown(true)用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了 setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

    使用时

    @Async 添加 到 需要异步操作的方法上

  • 相关阅读:
    java毕业设计企业员工业绩考核系统mybatis+源码+调试部署+系统+数据库+lw
    koa2学习和使用
    C++哈希
    Go 复合类型之字典类型介绍
    常用的深度学习自动标注软件
    软件配置 | Git下载、安装及卸载
    一篇文章带你搞懂MybatisPlus
    镜像仓库harbor安装部署
    快递查询、导出表格,批量操作效率更高
    利用norm.ppf&norm.interval分别计算正态置信区间[实例]
  • 原文地址:https://blog.csdn.net/yuanchun05/article/details/127659856