• JUC并发编程——线程池学习:基础概念及三大方法、七大参数、四大拒绝策略(基于狂神说的学习笔记)


    线程池

    池化技术的本质:事先准备好一些资源,线程复用,用完即还,方便管理

    默认大小:2

    最大并发数max 根据电脑去设置,CPU密集型,IO密集型

    线程池的好处:

    • 降低资源的消耗
    • 提高响应的速度,无需新建和销毁
    • 方便管理

    线程池学习:3大方法、7大参数、4大拒绝策略

    三大方法

    Executors.newSingleThreadExecutor(); // 单个线程
    Executors.newFixedThreadPool(5); // 创建一个固定的线程池,此处线程池的大小为5
    Executors.newCachedThreadPool();// 可变线程池
    
    • 1
    • 2
    • 3

    以下示例展示了开辟线程池的三个方法,以及如何用线程池的方法创建线程

    package pool;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    // 三大方法
    // 使用线程池,要使用线程池来创建线程
    public class Demo01 {
        public static void main(String[] args) {
            // ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 单个线程
            // ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池,此处线程池的大小为5
            ExecutorService threadPool = Executors.newCachedThreadPool();// 可变线程池
            try {
                for (int i = 0; i < 10; i++) {
                    // execute 是线程池的执行方法,其中传入一个线程,可以用lambda表达式写run方法
                    threadPool.execute(()->{
                        System.out.println(Thread.currentThread().getName());
                    });
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // 线程池用完,程序结束,需要关闭线程池
                threadPool.shutdown();
            }
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    Executors.newSingleThreadExecutor();的执行结果如下:

    在这里插入图片描述

    Executors.newFixedThreadPool(5);的执行结果如下:

    在这里插入图片描述

    Executors.newCachedThreadPool(); 的执行结果如下:
    在这里插入图片描述

    七大参数

    源码分析:

    //Executors.newSingleThreadExecutor()源码
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    // Executors.newFixedThreadPool(5) 源码
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    // Executors.newCachedThreadPool() 源码
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    /**
     *  可以发现,三大方法中,无论哪一个方法,实际上都调用了ThreadPoolExecutor方法
     * 所以本质上:三大方法是ThreadPoolExecutor方法的不同参数结果
    */
    
    
    // ThreadPoolExecutor 源码   可以发现,该方法,有7个参数
    public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
                              int maximumPoolSize, // 线程池大小(最大容纳量)
                              long keepAliveTime,// 超时了,没有人用就会释放
                              TimeUnit unit, // 超时单位
                              BlockingQueue<Runnable> workQueue,// 阻塞队列
                              ThreadFactory threadFactory,// 线程工厂,创建线程的,一般不用动
                              RejectedExecutionHandler handler// 拒绝策略
                             ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    corePoolSize:核心线程永远开着,时时刻刻看可以使用

    maximumPoolSize:线程池最大容量(线程池并非时时刻刻所有线程都开启的,时时刻刻一直开启的只有核心线程),当阻塞队列已满,线程池还有未开启线程时,线程池将会开启未启动的线程

    keepAliveTime:当非核心线程在一定时间内都未被使用,则非核心线程池将关闭,也就是线程池释放。(等待超时)

    unit:超时单位

    workQueue:阻塞队列

    threadFactory:线程工厂,创建线程,一般不动

    handler:拒绝策略,当线程池中所有线程都在工作,且阻塞队列已满时,再进入的获取线程的请求的处理方法

    手动创建一个线程池

    ExecutorService threadPool = new ThreadPoolExecutor(
            2,
            5,
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    public class Demo01 {
        public static void main(String[] args) {
            // 自定义线程池!工作 ThreadPoolExecutor
            // 自定义线程池的最大承载量为: blockingQueue + max
            ExecutorService threadPool = new ThreadPoolExecutor(
                    2,
                    5,
                    3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
            try {
                for (int i = 0; i < 10; i++) {
                    // execute 是线程池的执行方法,其中传入一个线程,可以用lambda表达式写run方法
                    threadPool.execute(()->{
                        System.out.println(Thread.currentThread().getName());
                    });
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // 线程池用完,程序结束,需要关闭线程池
                threadPool.shutdown();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在上述例子中,因为for循环开启了10个线程,大于线程池的最大承载量(5+3=8),如果线程处理速度不够快,在最大承载量到达之前又来了请求,则会触发拒绝策略,而AbortPolicy()拒绝策略则是,当超过线程池最大承载量时,则会放弃之后请求,并抛出异常。

    4大拒绝策略

    在这里插入图片描述

    • AbortPolicy(): 拒绝请求,并抛出异常

    • CallerRunsPolicy():当线程池达到其最大容量并且所有工作线程都在忙的情况下,新的任务将被执行。CallerRunsPolicy策略意味着当调用线程池中的execute()方法来提交新任务时,如果线程池已经关闭或者达到饱和(即没有多余的线程可以处理新任务),那么新任务将在调用execute()方法的线程中直接执行。

      请注意,如果调用线程池的线程是主线程,这可能会导致主线程阻塞,直到任务完成。

    • DiscardPolicy:拒绝请求,但不抛出异常,进程正常运行

    • discardOldestPolicy();丢弃队列中最老的任务(即最早进入队列的任务),然后新任务将被加入队列。不会抛出异常

      请注意,这种策略可能会导致一些任务被丢弃,因此在使用时要特别注意。此处说明,丢弃最老的任务是指该任务直接被放弃,不会被重新执行,因此,在该策略中,有可能导致某些任务因被打断而无法完成

    小结和拓展

    • 最大线程数到底该如何定义

      • CPU密集型, 根据CPU的核数来定义,几核CPU就开几核,可以保持CPU的效率最高
        Runtime.getRuntime().availableProcessors(); // 获取当前电脑的核数
      
      • 1
      • IO密集型 判断程序中IO耗时特别大的线程的数量,最大线程数大于该数量即可
  • 相关阅读:
    MySQL-InnoDB引擎-架构和事务原理
    js 根据键判断值
    一个单身狗 和 两个单身狗
    使用stream实现两个list集合的合并(对象属性的合并)
    Python面试宝典:Python中与设计模式相关的面试笔试题(1000加面试笔试题助你轻松捕获大厂Offer)
    SEO优化的许多好处是与流量有直接关系
    java中spark数据集字段下划线改成驼峰
    D. Non-zero Segments(前缀和)
    Jmeter内置变量 vars 和props的使用详解
    文举论金:黄金原油全面走势分析策略指导。
  • 原文地址:https://blog.csdn.net/whale_cat/article/details/133871822