• 基础 | 并发编程 - [线程池]


    §1 为什么使用线程池

    • 实现了线程的复用
      • 降低资源消耗
        因为线程不再需要频繁的创建和销毁了
      • 提高响应速度
        线程是现成的,不用等待重新创建就可以直接执行任务
    • 实现对最大并发数的控制
    • 提高线程的可管理性
      线程池在提供线程的基础上,还提供更好的调优和监控的功能

    §2 ThreadPoolExecutor(线程池本体)

    ThreadPoolExecutor 是线程池的底层实现,如下所示

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    §2.1 主要参数


    ThreadPoolExecutor 具有 7 个主要参数

    • corePoolSize
      核心线程数,int
      线程池的常驻核心线程数,最小线程数
    • maximumPoolSize
      最大线程数,int
    • keepAliveTime
      空余线程存活时间值,long
    • unit
      空余线程存活时间单位,TimeUnit
    • workQueue
      任务阻塞队列,BlockingQueue
      队列放满时,创建新线程加入线程池
      使用阻塞队列是因为
      当线程池中的线程都被占用,且任务队列全被占满之后
      线程池可以通过增加新线程,提高线程池的并发能力
      所以,通常现有现成和任务队列都占满时,稍作等待就能继续容纳新任务
      因此使用阻塞队列,防止已满就出错或丢失任务
    • threadFactory
      线程工厂,ThreadFactory ,通常使用默认
      线程工厂可以指定 线程组名、线程名前缀、是不是守护线程、线程优先级等
    • handler
      拒绝策略,RejectedExecutionHandler
      当线程池中线程达到最大并且任务队列也满了时,触发拒绝策略
    §2.2 常用方法
    • execute(Runable)
      执行任务,无返回值
    • submit(Callable / Runable)
      执行任务,可以有返回值
    • shutdown()
      停止
    §2.3 线程池工作原理

    示意图
    在这里插入图片描述
    线程池行为模式

    • 线程池创建成功后,即等待执行任务
    • 调用 execute() 后,线程池工作流程如下:
      • 若运行的线程小于 corePoolSize:创建线程执行任务
      • 若运行的线程大约等于 corePoolSize:任务存入任务队列
      • 若队列也满了,但运行的线程小于 maximumPoolSize:创建线程执行任务
      • 若线程数和队列都满了,触发饱和拒绝策略
    • 线程池中的线程完成一个任务时,会尝试从队列中获取下一个任务
    • 线程池相对空闲,超过一定时间 keepAliveTime 后,开始杀死池中超出corePoolSize 部分的线程
    §2.4 拒绝策略

    预设拒绝策略
    有下列预设拒绝策略:

    • AbortPolicy
      默认,抛出 RejectedExcuionException
    • CallerRunsPolicy
      调用者执行,不抛出异常,不丢弃任务
      而是将任务退回给调用者
    • DiscardOldestPolicy
      丢弃队列中最久的任务,当前任务加入队列,并在此提交当前任务
    • DiscardPolicy
      直接丢弃当前任务,不做其他处理
      当业务允许丢弃任务时,推荐

    AbortPolicy 示例

    ExecutorService pool = new ThreadPoolExecutor(
            2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    CallerRunsPolicy 示例

    public static void main(String[] args) {
        ExecutorService pool = new ThreadPoolExecutor(
                2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
        try {
            for (int i = 0; i < 10; i++) {
                pool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
                });
            }
        }finally {
            pool.shutdown();
        }
    }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    DiscardOldestPolicy 示例

    public static void main(String[] args) {
        ExecutorService pool = new ThreadPoolExecutor(
                2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
        try {
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                pool.execute(()->{
                	try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println(Thread.currentThread().getName()+" is working for: "+ finalI);
                });
            }
        }finally {
            pool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    说明
    0、1 任务进入核心线程
    2、3、4 进入任务队列并将其填满
    5、6 进入任务队列未果,创建新线程并进入新线程
    7、8、9 进入任务队列未果,创建新线程未果,触发拒绝策略
    拒绝策略从最久的任务开始扔,扔了 3 个,正好把 7、8、9 放进去
    7、8、9 被完成上一轮任务的线程消费

    DiscardPolicy 示例

    ExecutorService pool = new ThreadPoolExecutor(
            2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
    
    • 1
    • 2
    • 3

    在这里插入图片描述
    说明
    道理和 DiscardOldestPolicy 基本一致,只不过丢弃的是最新的任务

    §3 Executors

    Executors 是线程池的工具类

    §4 常用预设线程池

    §4.1 一览
    名字说明特点适用场景
    newFixedThreadPoo固定线程数的线程池线程数量固定执行长期任务,性能较好
    newSingleThreadExecutor单线程线程池线程数量固定为 1需要任务按序单个消费的场景
    newCachedThreadPool缓存线程池自动扩容短期执行很多异步小任务负载较轻的服务 的场景
    newScheduledThreadPool任务调度线程池
    newWorkStealingPool

    阿里规范

    【强制】 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,
    这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    说明:Executors各个方法的弊端:
    1)newFixedThreadPool和newSingleThreadExecutor:
      允许的请求处理队列长度是Integer.MAX_VALUE,可能会耗费非常大的内存,甚至OOM。
    2)newCachedThreadPool和newScheduledThreadPool:
      允许的线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

    §4.2 各线程池简单示例

    newFixedThreadPoo

    public static void main(String[] args) {
    	// demo 中简单示例,从简了,下同
        ExecutorService pool = Executors.newFixedThreadPool(5);
        try {
            for (int i = 0; i < 15; i++) {
                pool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
                });
            }
        }finally {
            pool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述
    newSingleThreadExecutor

    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        try {
            for (int i = 0; i < 15; i++) {
                pool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
                });
            }
        }finally {
            pool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述
    newCachedThreadPool

    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        try {
            for (int i = 0; i < 15; i++) {
                pool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
                });
            }
        }finally {
            pool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    newScheduledThreadPool

    §5 自定义线程池

    自定义方式
    使用 ThreadPoolExecutor 手动指定相关参数即可
    参考上文 主要参数

    ExecutorService pool = new ThreadPoolExecutor(
               2,10,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(30),
               Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
    
    • 1
    • 2
    • 3

    线程池各参数配置原则
    需要参考线程池处理的业务的类型

    • CPU 密集型
    • IO 密集型

    CPU 密集型
    定义
    需要大量计算,其性能瓶颈在于主要在于 CPU 的计算能力
    这种业务主要是 CPU 在工作,很少阻塞,CPU 全速运转

    原则
    CPU 密集型业务需要尽量少的线程数量
    目的是为了减少 CPU 切换线程

    推荐值
    CPU 核数 + 1
    理论上应该 == CPU 核心数
    但业务在处理中,即使再 CPU 密集,也不可能一点 IO 都没有
    当 CPU 等待此线程 IO 时,可以处理另一个线程的计算,因此 +1

    服务器的逻辑 cpu 核数 可以通过下面代码或指令获取

    Runtime.getRuntime().availableProcessors();
    
    • 1

    cat /proc/cpuinfo| grep "processor"

    注意
    只有在多核 CPU 时才能得到性能提升
    相当于以前只用一个 CPU 核,现在用多个核一起计算
    单核 CPU 即使使用多线程也会受限于物理 CPU 总算力没有提升
    类似网络带宽只有 1 M,买了迅雷白金也不可能提高下载速度

    IO 密集型
    定义
    需要大量 IO,等待 IO 就绪时,CPU 可以处理其他线程的计算

    原则
    可以多配置线程,以便把 CPU 等待 IO 的时间利用起来
    但同时还要考虑当前服务在服务器(或容器)上是否独占,独占说明整个服务器或容器的资源都是当前服务的,否则需要根据经验取一个保守值

    还可以根据 业务流量服务点数 估算

    也可以依赖压测,从实际触发
    推荐值
    计算方式 1 :CPU 核数 * 2
    计算方式 2 :CPU 核数 / CPU 执行率
    计算方式 3 : 每秒业务量 * 每业务评价处理时间 / 服务节点数量(相当于计算得把 1 秒当 几秒使才能满足业务,然后按服务容器个数均分)

    服务非服务器独占时使用 1
    服务是服务器独占时使用 2
    情况复杂时使用 3,因为反正业务需要处理完,物理资源就这么多

    CPU 执行率 = 单位时间中处理线程池业务的时间/总时间
    CPU 执行率的估算方式

    • 通过 top 查看实时 cpu 利用率估算
    • 通过监控平台的监控数据估算
    • 使用常规值 10%~20%
  • 相关阅读:
    C++特色家政服务管理系统
    推荐系统_各种方法的损失计算过程
    Mycat【Java提高】
    ROS2报错 AttributeError: type object ‘type‘ has no attribute ‘_TYPE_SUPPORT‘
    java计算机毕业设计教评系统源码+mysql数据库+系统+lw文档+部署
    数据库系统原理与应用教程(068)—— MySQL 练习题:操作题 90-94(十二):DML 语句练习
    重温缓存的正确使用姿势
    转:关于征集第三批工业软件新场景新技术难题解决思路的公告
    Java函数式编程:二、高阶函数,闭包,函数组合以及柯里化
    【Linux】Ubuntu 20.04 深度学习 GPU 环境配置(CUDA Toolkit 11.7 + cuDNN v8.4.1)
  • 原文地址:https://blog.csdn.net/ZEUS00456/article/details/126533867