• Java由浅入深理解线程池设计和原理



    1 线程

    1.1 什么是线程?什么是进程?

    ​ 进程:是指在系统中正在运行的一个应用程序,每个进程之间是独立的,每个进程均运行在其专用的且受保护的内存

    ​ 线程:进程的基本执行单元,一个进程的所有任务都在线程中执行,进程要想执行任务,必须得有线程,进程至少要有一条线程

    1.2 java中线程的实现方式有几种?

    ​ 继承Thread类

    public class MyThread extends Thread{
        @Override
        public void run() {
            // 执行自己代码逻辑
            System.out.println("自己线程被执行");
        }
    
        public static void main(String[] args) {
            MyThread myThread = new MyThread();
            myThread.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    ​ 实现Runnable接口

    public class MyRunnable implements Runnable{
        @Override
        public void run() {
            // 执行自己代码逻辑
            System.out.println("自己Runnable被执行");
        }
    
        public static void main(String[] args) {
            Thread thread = new Thread(new MyRunnable());
            thread.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1.3 线程的生命周期是什么?

    在这里插入图片描述

    • NEW:刚刚创建,没做任何操作

      Thread thread = new Thread();
      System.out.println(thread.getState());
      
      • 1
      • 2
    • RUNNABLE:调用run,可以执行,但不代表一定在执行(RUNNING,READY)

      thread.start();
      System.out.println(thread.getState());
      
      • 1
      • 2
    • BLOCKED:抢不到锁

            final byte[] lock = new byte[0];
            new Thread(new Runnable() {
                public void run() {
                    synchronized (lock){
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            Thread thread2 = new Thread(new Runnable() {
                public void run() {
                    synchronized (lock){
                    }
                }
            });
            thread2.start();
            Thread.sleep(1000);
            System.out.println(thread2.getState());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • WAITING
    Thread thread2 = new Thread(new Runnable() {
        public void run() {
            LockSupport.park();
        }
    });
    
    thread2.start();
    Thread.sleep(500);
    System.out.println(thread2.getState());
    LockSupport.unpark(thread2);
    Thread.sleep(500);
    System.out.println(thread2.getState());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    TIMED_WAITING

    Thread thread3 = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    thread3.start();
    Thread.sleep(500);
    System.out.println(thread3.getState());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    TERMINATED

    //等待1s后再来看
    Thread.sleep(1000);
    System.out.println(thread.getState());
    
    • 1
    • 2
    • 3

    2 线程存在的问题

    2.1 一个线程只能执行一个任务

    在这里插入图片描述

    2.2 线程执行完后销毁,无法复用

    在这里插入图片描述

    public class MyThread extends Thread{
        @Override
        public void run() {
            // 执行自己代码逻辑
            System.out.println("自己线程被执行");
        }
    
        public static void main(String[] args) throws InterruptedException {
            MyThread myThread = new MyThread();
            myThread.start();
            // main线程休息5秒,等待myThread执行完成
            TimeUnit.SECONDS.sleep(10);
            // 执行业务代码
            System.out.println("执行其他业务代码");
            // 业务代码执行完成,需要再次执行线程
            myThread.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2.3 线程过多,导致JVM宕机

    在这里插入图片描述

    3 初识线程池

    简介:

    ​ 在多线程编程中,任务都是一些抽象且离散的工作单元,而线程是使任务异步执行的基本机制。随着应用的扩张,线程和任务管理也变得非常复杂。为了简化这些复杂的线程管理模式,我们需要一个“管理者”来统一管理线程及任务分配,这就是线程池。

    ​ 在主要大厂的编程规范中,不允许在应用中自行显式地创建线程,线程必须通过线程池提供。由于创建和销毁线程需要时间以及系统资源开销,使用线程池的好处是减少这些开销,解决资源不足的问题。

    3.1 了解J.U.C

    ​ J.U.C全称:java.util.concurrent,在并发编程中很常用的实用工具类。

    ​ 在并发编程中很常用的实用工具类,用于完成高并发、处理多线程的一个工具包。此包包括了几个小的、已标准化的可扩展框架,以及一些提供有用功能的类,没有这些类,这些功能会很难实现或实现起来冗长乏味

    在这里插入图片描述

    3.2 线程池解决了什么问题

    • 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
    • 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
    • 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
    • 节省CPU切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。
    • 提供更强大的功能,延时定时线程池。(Timer vs ScheduledThreadPoolExecutor)

    3.3 线程池引发了什么问题

    • 异步任务提交后,如果JVM宕机,已提交的任务会丢失,需要考虑确认机制。
    • 使用不合理,可能导致内存溢出问题
    • 参数过多,代码结构引入数据结构与算法,增加学习难度。

    4 线程池的设计思想

    在这里插入图片描述

    在这里插入图片描述

    5 线程池的原理

    5.1 了解线程池类继承结构图

    在这里插入图片描述

    说明:

    • 最常用的是ThreadPoolExecutor
    • 调度用ScheduledThreadPoolExecutor,类似Timer和TimerTask。
    • 任务拆分合并用ForkJoinPool
    • Executors是工具类,协助你创建线程池的

    5.2 线程池工作状态

    线程池状态

    在这里插入图片描述

    • RUNNING:初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。RUNNING状态下,能够接收新任务,以及对已添加的任务进行处理。

    • SHUTDOWN:不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

    //shutdown后不接受新任务,但是task1,仍然可以执行完成
    
    ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
    poolExecutor.execute(new Runnable() {
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("finish task 1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    poolExecutor.shutdown();
    poolExecutor.execute(new Runnable() {
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    System.out.println("ok");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • STOP:不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN ) -> STOP

      注意:容易引发不可预知的结果!运行中的任务也许还会打印,直到结束,因为调的是Thread.interrupt

    //改为shutdownNow后,任务立马终止,sleep被打断,新任务无法提交,task1停止
    poolExecutor.shutdownNow();
    
    • 1
    • 2
    • TIDYING:所有的任务已终止,队列中的”任务数量”为0,线程池会变为TIDYING。线程池变为TIDYING状态时,会执行钩子函数terminated(),可以通过重载terminated()函数来实现自定义行为
    //自定义类,重写terminated方法
    public class MyExecutorService extends ThreadPoolExecutor {
    
        public MyExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        protected void terminated() {
            super.terminated();
            System.out.println("terminated");
        }
        
        //调用 shutdownNow, ternimated方法被调用打印
        public static void main(String[] args) throws InterruptedException {
            MyExecutorService service = new MyExecutorService(1,2,10000,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(5));
            service.shutdownNow();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

    5.3 掌握线程池个参数定义

    /**
     * @param corePoolSize 池中要保留的线程数
     * @param maximumPoolSize 中允许的最大线程数,前提是队列先满
     * @param keepAliveTime 当线程数大于核心,这是多余空闲线程的最长时间将在终止之前等待新任务。
     * @param keepAliveTime参数的时间单位
     * @param workQueue 用于保存任务的队列
     * @param threadFactory 执行器创建新线程的工厂
     * @param handler 阻止执行时要使用的处理程序,因为达到了线程边界和队列容量
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    5.4 线程池结构说明

    在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部协调空闲的线程,如果有,则将任务交给某个空闲的线程。一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。

    (源码查看:两个集合,一个queue,一个hashset)

    在这里插入图片描述

    5.5 线程池的任务提交

    • 添加任务,如果线程池中线程数没达到coreSize,直接创建新线程执行
    • 达到core,放入queue
    • queue已满,未达到maxSize继续创建线程
    • 达到maxSize,根据reject策略处理
    • 超时后,线程被释放,下降到coreSize

    5.6 线程池工具类Executors

    • newCachedThreadPool() : 弹性线程数
    • newFixedThreadPool(int nThreads) : 固定线程数
    • newSingleThreadExecutor() : 单一线程数
    • newScheduledThreadPool(int corePoolSize) : 可调度,常用于定时

    5.7 确定线程池的线程数

    ​ 虽然使用线程池的好处很多,但是如果其线程数配置得不合理,不仅可能达不到预期效果,反而可能降低应用的性能。

    按照任务类型对线程池进行分类:

    (1)IO密集型任务

    ​ 此类任务主要是执行IO操作。由于执行IO操作的时间较长,导致CPU的利用率不高,这类任务CPU常处于空闲状态。Netty的IO读写 操作为此类任务的典型例子。

    (2)CPU密集型任务

    ​ 此类任务主要是执行计算任务。由于响应时间很快,CPU一直在运行,这种任务CPU的利用率很高。

    (3)混合型任务

    ​ 此类任务既要执行逻辑计算,又要进行IO操作(如RPC调用、数据库访问)。相对来说,由于执行IO操作的耗时较长(一次网络往 返往往在数百毫秒级别),这类任务的CPU利用率也不是太高。Web服务器的HTTP请求处理操作为此类任务的典型例子。一般情况 下,针对以上不同类型的异步任务需要创建不同类型的线程池,并进行针对性的参数配置。

    5.7.1 为IO密集型任务确定线程数

    ​ 由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用 其他线程继续使用CPU,以提高CPU的使用率。Netty的IO处理任务就是典型的IO密集型任务。所以,Netty的Reactor(反应器)实 现类(定制版的线程池)的IO处理线程数默认正好为CPU核数的两倍

    5.7.2 为CPU密集型任务确定线程数

    ​ CPU密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗CPU资源,比如计算圆周率、对视频进行高清解码 等。CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要 最高效地利用CPU,CPU密集型任务并行执行的数量应当等于CPU的核心数。

    ​ 比如4个核心的CPU,通过4个线程并行地执行4个CPU密集型任务,此时的效率是最高的。但是如果线程数远远超出CPU核 心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。因此,对于CPU密集型的任务来说,线 程数等于CPU数就行。

    5.7.3 为混合型任务确定线程数

    ​ 混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作(如RPC调用、数据库访问、网络通信等),所以混合型任务CPU的利用率不是太高,非CPU耗时往往是CPU耗时的数倍。比如在Web应用中处理HTTP请求时,一次请求处理会包括DB操作、RPC操作、缓存操作等多种耗时操作。一般来说,一次Web请求的CPU计算耗时往往较少,大致在100~500毫秒,而其他耗时操作会占用500~1000毫秒,甚至更多的时间。在为混合型任务创建线程池时,如何确定线程数呢?业界有一个比较成熟的估算公式,具体如下:

    最佳线程数 = ((线程等待时间+线程CPU时间) / 线程CPU时间) * CPU核数
    
    • 1

    通过公式可以看出:等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。下面举一个例子:比如在Web服务器处理HTTP请求时,假设平均线程CPU运行时间为100毫秒,而线程等待时间(比如包括DB操作、RPC操作、缓存操作等)为900毫秒,如果CPU核数为8,那么根据上面这个公式,估算如下:

    (900毫秒 + 100毫秒) / 100毫秒 * 8 = 10 * 8 = 80
    
    • 1

    5.8 线程池源码刨析

    在这里插入图片描述

    //任务提交阶段:(4个if条件路线)
    
    public void execute(Runnable command) {
      if (command == null)
                throw new NullPointerException();
      int c = ctl.get();
      //判断工作数,如果小于coreSize,addWork,注意第二个参数core=true
      if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true))
              return;
          c = ctl.get();
      }
      //否则,如果线程池还在运行,offer到队列
      if (isRunning(c) && workQueue.offer(command)) {
          //再检查一下状态
          int recheck = ctl.get();
          //如果线程池已经终止,直接移除任务,不再响应
          if (! isRunning(recheck) && remove(command))
              reject(command);
          //否则,如果没有可用线程的话(比如coreSize=0),创建一个空work
            //该work创建时不会给指派任务(为null),但是会被放入works集合,进而从队列获取任务去执行
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      //队列也满,继续调addWork,但是注意,core=false,开启到maxSize的大门
      //超出max的话,addWork会返回false,进入reject
      else if (!addWorker(command, false))
          reject(command);
    }
    //线程创建
    
    private boolean addWorker(Runnable firstTask, boolean core) {
        //第一步,计数判断,不符合条件打回false
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
    
            for (;;) {
                int wc = workerCountOf(c);
                //判断线程数,注意这里!
                //也就说明线程池的线程数是不可能设置任意大的。
                //最大29位(CAPACITY=29位二进制)
                //超出规定范围,返回false,表示不允许再开启新工作线程,创建worker失败!
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //第二步,创建新work放入线程集合works(一个HashSet)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //符合条件,创建新的work并包装task
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加锁,workers是一个hashset,这里要保障线程安全性
                mainLock.lock();
                try {   
                            //...
                        //在这里!!!
                        workers.add(w);
                        
                        //...                    
                        workerAdded = true;
                    
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //注意,只要是成功add了新的work,那么将该新work立即启动,任务得到执行
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    //任务获取与执行
      
    //在worker执行runWorker()的时候,不停循环,先查看自己有没有携带Task,如果有,执行
    while (task != null || (task = getTask()) != null)
    
    //如果没用,会调用getTask,从队列获取任务
    
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // ...
    
            int wc = workerCountOf(c);
    
            // Are workers subject to culling? - 很形象,要不要乖乖的被“捕杀”?
            //判断是不是要超时处理,重点!!!决定了当前线程要不要被释放
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                    //线程数超出max,并且上次循环中poll等待超时了,那么说明该线程已终止
            //将线程队列数量原子性减
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //计数器做原子递减,递减成功后,返回null,for被中止
                if (compareAndDecrementWorkerCount(c))
                    return null;
                //递减失败,继续下一轮循环,直到成功
                continue;
            }
    
            try {
                //重点!!!
                //如果线程可被释放,那就poll,释放的时间为:keepAliveTime
                //否则,线程是不会被释放的,take一直被阻塞在这里,直到来了新任务继续工作
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //到这里说明可被释放的线程等待超时,已经销毁,设置该标记,下次循环将线程数减少
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140

    5.9 Executors

    以上构造函数比较多,为了方便使用,juc提供了一个Executors工具类,内部提供静态方法

    1)newCachedThreadPool() : 弹性线程数

    2)newFixedThreadPool(int nThreads) : 固定线程数

    3)newSingleThreadExecutor() : 单一线程数

    4)newScheduledThreadPool(int corePoolSize) : 可调度,常用于定时

    6 线程池的经典面试题

    6.1 线程池是如何保证线程不被销毁的呢?

    答案:如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮 work.runWork()中循环

    验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask()

    //work.runWork():
    while (task != null || (task = getTask()) != null)
        
        
    //work.getTask():
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    6.2 核心线程与非核心线程有区别吗?

    答案:没有。被销毁的线程和创建的先后无关。即便是第一个被创建的核心线程,仍然有可能被销毁

    验证:看源码,每个work在runWork()的时候去getTask(),在getTask内部,并没有针对性的区分当前work是否是核心线程或者类似的标记。只要判断works数量超出core,就会调用poll(),否则take()

    6.3 线程池7个参数的作用及生效时机

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    一个Linux自动备份脚本的示例
    孩子没有感统失调的表现,还有必要做感统训练吗?
    flask自定义序列化
    C++DAY39
    pdf拆分成一页一页
    光栅和矢量图像处理:Graphics Mill 11.4.1 Crack
    自定义spring-boot-starter
    Mac删除不在程序坞的程序
    Linux C中对json格式数组数据的生成与解析
    SpringBoot(一)快速入门
  • 原文地址:https://blog.csdn.net/ZGL_cyy/article/details/133208026