• 《Java并发编程的艺术》2 第九章 Java中的线程池


    第九章、Java中的线程池

    Java中的线程池时运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。

    合理地使用线程池能带来三个好处:

    • **降低资源消耗。**通过重复利用已创建的线程降低线程创建和销毁造成的损耗。
    • **提高响应速度。**当任务到达时,任务可以不需要等到线程创建就能立即执行。
    • **提高线程的可管理性。**线程是稀缺资源,使用线程池进行统一分配、调优和监控。

    1.线程池的实现原理

    从图中可以理解提交一个线程到线程池时线程的工作原理:

    img

    关于线程池的具体实现原理,在了解一些线程池的参数之后,在作讲解。

    2.线程池的参数

    我们可以通过ThredPoolExecutor 来创建一个线程池

    new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, milliseconds, runnableTaskQueue, handler);
    
    • 1

    创建线程池需要这几个参数:

    1. corePoolSize(线程池的基本大小):提交任务到线程池,会优先创建线程(即便存在空闲的线程),等到需要执行的任务数大于 corePoolSize 时就不再创建。(这句话在后面【线程数最大数量】会有修正)

    2. runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

      • ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO(先进先出)
      • LinkedBlockingQueue:基于链表结构的阻塞队列,FIFO,吞吐量通常高于ArrayBlockingQueue
      • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue
      • PriorityBlockingQueue:具有优先级的无限阻塞队列
    3. maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且线程池小于maximumPoolSize ,线程池就会再创建新线程。如果使用了无界的任务队列这个参数就没什么效果

    4. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

    5. RejectedExcutionHandler(饱和策略):当线程池和队列都满了,采取一种策略来处理提交的新任务。这个策略默认下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中提供了以下四种策略:

      • AbortPolicy:直接抛出异常(默认)

      • CallerRunsPolicy:只用调用者所在线程来运行任务

      • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务

      • DiscardPolicy:不处理,丢弃过

        可以通过实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务

      • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。可以通过调大时间提高线程的利用率

      • TimeUnit(线程活动保持时间的单位):天、小时、分钟、毫秒、微妙、千分之一毫秒、纳秒。

    3.向线程池提交任务

    可以使用两个方法向线程池提交任务:

    3.1 execute()

    用于提交不需要返回值的任务,所以无法判断线程是否执行成功。

    execute提交的是一个Runnable类的实例。

    image-20220726155638920

    1. 如果线程池中的线程数小于 corePoolSize ,则创建新线程来执行任务(需要获取全局锁)
    2. 如果线程池中的线程数等于或大于 corePoolSize ,则将任务加入 BlockingQueue
    3. 如果无法将任务加入 BlockingQueue(队列已满),创建新的线程来处理任务(需要获取全局锁)
    4. 如果创建新线程将使当前运行的线程超出 maximumPoolSize ,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法

    img

    3.2 submit()

    用于提交需要返回值的任务。线程池会返回一个future类型的对象

    image-20220726155852935

    future对象:

    image-20220726160016965

    这个future对象可以判断任务是否执行成功,并且可以通过get()方法获取返回值,get(long timeout, TimeUnit unit) 方法则会阻塞一段时间后返回,这时任务可能没有执行完,会报TimeoutException

    3.3 实战

    1.不使用get()

    public class ThreadTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main_start");
    
            Runnable runnable = new Runable01();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                    200,
                    10,
                    TimeUnit.SECONDS,new LinkedBlockingDeque<>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            threadPoolExecutor.submit(runnable);
    
            System.out.println("main_end");
    
        }
    
        public static class Runable01 implements Runnable{
    
            @Override
            public void run() {
                System.out.println("当前线程" + Thread.currentThread().getId());
                int i = 10/5;
                System.out.println("运行结果:" + i);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    image-20220726160759397

    2.使用get()

    public class ThreadTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main_start");
    
            Runnable runnable = new Runable01();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                    200,
                    10,
                    TimeUnit.SECONDS,new LinkedBlockingDeque<>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            Future<?> future = threadPoolExecutor.submit(runnable);
            Object o = future.get();
    
            System.out.println("main_end");
    
        }
    
        public static class Runable01 implements Runnable{
    
            @Override
            public void run() {
                System.out.println("当前线程" + Thread.currentThread().getId());
                int i = 10/5;
                System.out.println("运行结果:" + i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    image-20220726161312606

    3.get(1L,TimeUnit.NANOSECONDS)

    public class ThreadTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            System.out.println("main_start");
    
            Runnable runnable = new Runable01();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                    200,
                    10,
                    TimeUnit.SECONDS,new LinkedBlockingDeque<>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            Future<?> future = threadPoolExecutor.submit(runnable);
            Object o = future.get(1L,TimeUnit.NANOSECONDS);
    
            System.out.println("main_end");
    
        }
    
        public static class Runable01 implements Runnable{
    
            @Override
            public void run() {
                System.out.println("当前线程" + Thread.currentThread().getId());
                int i = 10/5;
                System.out.println("运行结果:" + i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    image-20220726161557387

    4.关闭线程池

    原理:

    • 遍历线程池中的工作线程,逐个调用线程的interrupt 方法中断线程,所以无法响应中断的任务可能永远无法终止

    关闭线程池的方法以及一些不同之处:

    • shutdown():将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程
    • shutdownNow():首先将线程池中的状态设置成STOP,然后尝试停止所以正在执行或暂停任务的线程,并返回等待执行任务的线程的列表

    只要调用了这两个方法中的任意一个,isShutDown就会返回true。

    当所有的服务关闭后,才表示线程关闭成功,这时调用isTerminaed方法会返回true。

    通常调用shutdown 方法,如果线程不一定要执行完,可以调用 shutdownNow 方法。

    5.合理分配线程池

    要想合理分配线程池,必须分析任务特性:

    • 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务
    • 任务的优先级:高、中和低
    • 任务的执行时间:长、中和短
    • 任务的依赖性:是否依赖其他系统资源,如数据库连接

    性质不同的任务可以使用不同规模的线程池来分开处理。

    img

    如果一直有优先级高的任务提交到队列里,则优先级低的任务可能永远不能执行

    建议使用有界队列,这样如果线程池中的任务耗时较大、任务量较多时仅会影响后台任务,同时提醒系统出现问题了,如果使用的无界队列,可能等到挤爆内存,整个系统不可用了,才会发现错误。

    6.线程池的监控

    一些线程池监控的属性:

    • taskCount:线程池需要执行的任务数量
    • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于 taskCount
    • largestPoolSize:线程池里曾经创建过的最大线程数量。可以直到线程池是否满过
    • getPoolSize:线程池的线程数量。线程池不销毁的话,线程不会减少
    • getActiveCount:获取活动的线程数

    可以通过重写线程池的beforeExecute、afterExecute和terminated方法自定义线程池来对线程进行监控。这几个方法在线程池里默认是空方法。

    image-20220726163612497

  • 相关阅读:
    Visual Studio部署C++矩阵库Armadillo的方法
    [附源码]计算机毕业设计共享汽车系统
    寻找领域不变量:从生成模型到因果表征
    基于Ansys workbench进行发动机风扇非定常流固耦合计算
    视频剪辑制作教学:分享十种剪辑技巧,打好基础很重要
    【初学者必看】vlc实现的rtsp服务器及转储H264文件
    晚上有些凉
    Docker基础
    小程序_小程序的授权
    BeanNameViewResolver类简介说明
  • 原文地址:https://blog.csdn.net/qq_52476654/article/details/125998113