• JUC线程池


    一、JUC介绍

    java.util.concurrent包(简称:JUC)。JUC主要是让开发者在多线程编程中更加简单、方便一些。 通过JDK内置了一些类、接口、关键字,补充完善了JDK对于并发编程支持的“短板”。

    主要功能:(1)Executor:线程池(2)Atomic:原子操作类(3)Lock:锁(4)Tools:信号量工具类(4)并发集合:提供了线程安全的集合类。

    二、线程池

    (1)什么是线程池:

    内存中的一块空间。这块空间里面存放一些已经实例化好的线程对象。当代码中需要使用线程时直接从线程池获取。当代码中线程执行结束或需要销毁时,把线程重新放入回到线程池,而不是让线程处于死亡状态。

    (2)线程池的优缺点:

    优点:(1)降低系统资源消耗。通过重用线程对象,降低因为新建和销毁线程产生的系统消耗。(2)提高系统响应速度。直接从内存中获取线程对象,比新建线程更快。(3)提供线程可管理性。通过对线程池中线程数量的限制,避免无限创建线程导致的内存溢出或CPU资源耗尽等问题。

    缺点:默认情况下,无论是否需要使用线程对象,线程池中都有一些线程对象,也就是说会占用一定内存。

    三、JUC中的线程池

    1. Executor介绍:

    Executor 线程池顶级接口, 接口中只有一个execute()方法,方法参数为Runnable类型。

    2.ThreadPoolExecutor

    是JUC中提供的默认线程池实现类, Executor的子类。

    构造方法中的七个参数:

    (1)int  corePoolSize核心线程数:创建线程池后,默认线程池中并没有任何的线程,执行了从线程池获取线程执行任务的时候才会创建核心线程完成任务的执行。 如果没有达到指定corePoolSize, 即使有空闲的核心线程, 也会创建新的核心线程执行任务, 直到达到了corePoolSize。 达到corePoolSize后, 从线程池获取线程执行任务, 有空闲的核心线程, 空闲的线程会执行新任务。

    (2)BlockingQueue  workQueue阻塞队列:

    理解:阻塞:当队列为空时,阻塞获取任务;当队列放满时,阻塞添加任务。

    1. 当线程池中的线程数目达到指定的corePoolSize后,并且所有的核心线程都在使用中, 再来获取线程执行任务, 会将任务添加到缓存任务的阻塞队列中,也就是workQueue。

    2. 队列可以设置queueCapacity 参数,表示任务队列最多能存储多少个任务。

    (3)int  maximumPoolSize最大线程数:

    1. 所有的核心线程都被使用中,且任务队列已满时,线程池会创建新的线程执行任务,直到线程池中的线程数量达到maximumPoolSize。

    2. 被使用的线程数等于maximumPoolSize ,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。

    (4)long  keepAliveTime线程最大空闲时间:

    1. 线程池中存在空闲的线程, 就会处于空闲(alive)状态, 只要超过keepAliveTime, 空闲的线程就会被销毁,直到线程池中的线程数等于corePoolSize。

    2. 如果设置了allowCoreThreadTimeOut=true(默认false),核心线程也可以被销毁。

    (5)TimeUnit  unitkeepAliveTime 时间单位:

    TimeUnit是枚举类型。

    1. public enum TimeUnit {
    2. //纳秒
    3. NANOSECONDS(TimeUnit.NANO_SCALE),
    4. //微妙
    5. MICROSECONDS(TimeUnit.MICRO_SCALE),
    6. //毫秒
    7. MILLISECONDS(TimeUnit.MILLI_SCALE),
    8. //
    9. SECONDS(TimeUnit.SECOND_SCALE),
    10. //分钟
    11. MINUTES(TimeUnit.MINUTE_SCALE),
    12. //小时
    13. HOURS(TimeUnit.HOUR_SCALE),
    14. //
    15. DAYS(TimeUnit.DAY_SCALE);
    16. private static final long NANO_SCALE = 1L;
    17. private static final long MICRO_SCALE = 1000L * NANO_SCALE;
    18. private static final long MILLI_SCALE = 1000L * MICRO_SCALE;
    19. private static final long SECOND_SCALE = 1000L * MILLI_SCALE;
    20. private static final long MINUTE_SCALE = 60L * SECOND_SCALE;
    21. private static final long HOUR_SCALE = 60L * MINUTE_SCALE;
    22. private static final long DAY_SCALE = 24L * HOUR_SCALE;
    23. }

    (6)ThreadFactory  threadFactory线程工厂:创建线程对象。

    (7)RejectedExecutionHandler  handler线程池拒绝策略:

    只有当任务队列已满,且线程数量已经达到maximunPoolSize才会触发拒绝策略。

    1. AbortPolicy: 丢弃新任务,抛出异常,提示线程池已满(默认)。

    2. DisCardPolicy: 丢弃任务,不抛出异常。

    3. DisCardOldSetPolicy: 将消息队列中最先进入队列的任务替换为当前新进来的任务。

    4. CallerRunsPolicy: 由调用该任务的线程处理, 线程池不参与, 只要线程池未关闭,该任务一直在调用者线程中。

    1. package com.java.test;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.ThreadPoolExecutor;
    4. import java.util.concurrent.TimeUnit;
    5. /*
    6. * 使用实例
    7. * */
    8. public class Test01 {
    9. public static void main(String[] args) {
    10. ThreadPoolExecutor te = new ThreadPoolExecutor(
    11. 2,
    12. 4,
    13. 5,
    14. TimeUnit.SECONDS,
    15. new ArrayBlockingQueue(5)
    16. );
    17. te.execute(new Runnable() {
    18. @Override
    19. public void run() {
    20. System.out.println(Thread.currentThread().getName());
    21. }
    22. });
    23. te.execute(new Runnable() {
    24. @Override
    25. public void run() {
    26. System.out.println(Thread.currentThread().getName());
    27. }
    28. });
    29. te.execute(new Runnable() {
    30. @Override
    31. public void run() {
    32. System.out.println(Thread.currentThread().getName());
    33. }
    34. });
    35. }
    36. }

    3.Executors

    (1)介绍:Executors时线程池的工具类,返回值都是ExecutorService接口的实现类, 底层大多是调用ThreadPoolExecutor()。

    (2)newThreadPoolExecutor()方法:单线程

    1. //源码
    2. public static ExecutorService newSingleThreadExecutor() {
    3. return new FinalizableDelegatedExecutorService
    4. (new ThreadPoolExecutor(1, 1,
    5. 0L, TimeUnit.MILLISECONDS,
    6. new LinkedBlockingQueue()));
    7. }

    效果总结:

    1. 它只会创建一条工作线程处理任务;

    2. 采用的阻塞队列为LinkedBlockingQueue, 它是一个无界队列, 底层为链表。

    1. package com.java.test;
    2. import java.util.concurrent.ExecutorService;
    3. import java.util.concurrent.Executors;
    4. /*
    5. * 使用方式
    6. * */
    7. public class Test02 {
    8. public static void main(String[] args) throws InterruptedException {
    9. ExecutorService es = Executors.newSingleThreadExecutor();
    10. while (true) {
    11. es.execute(new Runnable() {
    12. @Override
    13. public void run() {
    14. System.out.println(Thread.currentThread().getName());
    15. // while (true){}
    16. }
    17. });
    18. Thread.sleep(1000);
    19. }
    20. }
    21. }

    (3)newFixedThreadPool(int nThreads)方法:自定义线程数

    1. //源码
    2. public static ExecutorService newFixedThreadPool(int nThreads) {
    3. return new ThreadPoolExecutor(nThreads, nThreads,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue());
    6. }

    效果总结:

    1. 它是一种固定大小的线程池;

    2. corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;

    3. keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;

    4. 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列, 底层为链表;

    5. 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;

    6. 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

    1. package com.java.test;
    2. import java.util.concurrent.ExecutorService;
    3. import java.util.concurrent.Executors;
    4. public class TestnewFixedThreadPool {
    5. public static void main(String[] args) throws InterruptedException {
    6. ExecutorService executorService = Executors.newFixedThreadPool(5);
    7. while (true){
    8. executorService.submit(new Runnable() {
    9. @Override
    10. public void run() {
    11. System.out.println(Thread.currentThread().getName());
    12. }
    13. });
    14. Thread.sleep(1000);
    15. }
    16. }
    17. }

    (4)newCachedThreadPool():缓存线程池

    1. //源码
    2. public static ExecutorService newCachedThreadPool() {
    3. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    4. 60L, TimeUnit.SECONDS,
    5. new SynchronousQueue());
    6. }

    效果总结:

    1. 它是一个可以无限扩大的线程池;

    2. 它比较适合处理执行时间比较小的任务;

    3. corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;

    4. keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;

    5. 采用SynchronousQueue(同步队列)装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

    1. package com.java.test;
    2. import java.util.concurrent.ExecutorService;
    3. import java.util.concurrent.Executors;
    4. public class TestnewCachedThreadPool {
    5. public static void main(String[] args) throws InterruptedException {
    6. ExecutorService executorService = Executors.newCachedThreadPool();
    7. while (true){
    8. executorService.submit(new Runnable() {
    9. @Override
    10. public void run() {
    11. System.out.println(Thread.currentThread().getName());
    12. while (true){}
    13. }
    14. });
    15. Thread.sleep(1000);
    16. }
    17. }
    18. }

    (5) newScheduledThreadPool():

    1. //源码
    2. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    3. return new ScheduledThreadPoolExecutor(corePoolSize);
    4. }
    5. public ScheduledThreadPoolExecutor(int corePoolSize) {
    6. super(corePoolSize, Integer.MAX_VALUE,
    7. DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
    8. new DelayedWorkQueue());
    9. }

    效果总结:

    1. 它采用DelayQueue存储等待的任务

    2. 它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;

    3. DelayQueue也是一个无界队列;

    4. 工作线程会从DelayQueue取已经到期的任务去执行;

    5. 执行后也可以将任务重新定时, 放入队列中;

    6. 支持定时, 周期性执行。

    1. package com.java.test;
    2. import java.util.concurrent.Executors;
    3. import java.util.concurrent.ScheduledExecutorService;
    4. import java.util.concurrent.TimeUnit;
    5. public class TestnewScheduledThreadPool {
    6. public static void main(String[] args) {
    7. ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
    8. //定时执行: 3秒后执行一次
    9. /* ses.schedule(new Runnable() {
    10. @Override
    11. public void run() {
    12. System.out.println(Thread.currentThread().getName());
    13. }
    14. }, 3, TimeUnit.SECONDS);*/
    15. //周期执行: 参数1: 延时时间 参数2: 周期性执行, 隔多少时间执行一次
    16. ses.scheduleAtFixedRate(new Runnable() {
    17. @Override
    18. public void run() {
    19. System.out.println(Thread.currentThread().getName());
    20. }
    21. }, 0, 3, TimeUnit.SECONDS);
    22. }
    23. }

    (6)newWorkStealingPool():工作窃取池

    工作原理(工作窃取算法):把一个Thread 分叉(fork)成多个子线程。让多个子线程执行本来一个线程应该执行的任务。最后把多个线程执行结果合并。

    如果在分叉后一个线程执行完成,另外的线程还没有结束,会从双端队列中尾部处理任务,另一个线程从头部取任务,防止出现线程竞争。

  • 相关阅读:
    【免杀前置课——Windows编程】十四、异步IO——什么是异步IO、API定位问题、APC调用队列
    lettuce利用stream实现消息推送
    YOLOv5-seg数据集制作、模型训练以及TensorRT部署
    顺序结构 与 选择结构
    SparkStreaming (六) --------- 优雅关闭
    Linux查端口占用的几种方式
    单元测试(JUint)
    一、CSS背景样式[背景样式、盒子阴影]
    脑供血不足是怎么回事?
    Windows 搭建 FTP 服务器
  • 原文地址:https://blog.csdn.net/weixin_53455615/article/details/126518918