• Java面试题16-线程池的底层工作原理


    执行过程

    线程池内部是通过队列 + 线程实现的,当我们利用线程池执行任务时:

    1、如果此时线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    2、如果此时线程池中的线程数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。

    3、如果此时线程池中的线程数量大于等于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

    4、如果此时线程池中的线程数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler指定的策略来处理此任务。

    5、当线程池中的线程数量大于corePoolSize时,如果某线程空闲事件超过keepAliveTime,线程将被终止,这样,线程池可以动态的调整池中的线程数。

    源码深入理解

    线程池的构造函数源码
    在这里插入图片描述

    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    1、实际上的"线程池"和工作线程

    在这里插入图片描述
    这个HashSet就是存放所有工作线程的集合,Worker就代表一个工作的线程。

    而这个Worker则是ThreadPoolExecutor的一个内部类。
    在这里插入图片描述
    Worker中有两个重要的属性,分别是

    		 /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;  // 工作线程
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;  // 第一次要执行的任务
    
    • 1
    • 2
    • 3
    • 4

    然后我们来看Worker的构造器:

    		Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                // 将Worker本身作为参数,传递给了new的Thread
                this.thread = getThreadFactory().newThread(this);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里设计的非常巧妙,我们说Worker实现了Runnable,并且还有一个Thread的属性,在构造器实现的时候又将Worker本身作为参数传递给了Thread,这样就能实现一个场景:只要Woker的Thread线程被启动,那么Worker的run()方法就会执行。

    而Worker的run()方法如下:

    		public void run() {
                runWorker(this);
            }
    
    • 1
    • 2
    • 3

    我们跟踪到runWorker()中一探究竟,在关键的地方都有详细的注释:

    	final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask; // 将第一次要执行的任务交给task对象
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                	// 第一次的任务就是firstTask,所以会进入while循环
                	// 第一次执行完之后,task=null 被清空
                	// 之后获取的任务是getTask()的任务
                	// 如果获取到,run就会继续执行
                	// 这就说明了线程能被复用的特性
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 判断线程池的状态
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run(); // 执行传入的任务
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null; // 清空任务
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    然后我们继续去探究,后来的任务是从哪来的呢?我们看getTask()中的源码(部分源码,只显示关键部分):

    	private Runnable getTask() {
    	//....
    	try {
    		//从任务队列中获取task
            Runnable r = timed ?
            // 非阻塞线程的获取
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            // 阻塞线程的获取(核心线程)
            workQueue.take();
            if (r != null)
                return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
           }
        //... 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2、"工作线程"Worker的创建

    要想知道工作线程Worker什么时候被创建,我们就需要去查看线程池执行的方法execute中的源码。

    execute的源码:

    	// 其实就是线程池的工作原理底层判断
    	public void execute(Runnable command) {
            if (command == null)
            	// 首先判断传入的值是否为空
                throw new NullPointerException();
            // 该值可以判断线程池的工作状态以及线程池中有多少线程在工作
            int c = ctl.get(); 
            if (workerCountOf(c) < corePoolSize) {
            	// 工作线程数小于核心线程数
                if (addWorker(command, true))
                	// 创建一个worker
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
            	// 尝试往工作队列中添加一个线程
            	// 失败了往后执行 
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 检查线程池状态,防止刚添加线程后线程池就关闭了
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }else if (!addWorker(command, false))
            	// 如果添加线程失败,执行拒绝线程
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 相关阅读:
    【C#/.NET】使用ASP.NET Core对象池
    IDEA下载安装
    Nginx基础之location匹配规则实践篇
    【应用统计学】参数统计-抽样分布
    美团搜索粗排优化的探索与实践
    iNFTnews | 从《雪崩》到百度“希壤”,元宇宙30年的16件大事
    基于视频技术与AI检测算法的体育场馆远程视频智能化监控方案
    Typora收费后我换了个Markdown编辑器(Marktext)
    织梦翻译插件-织梦自动采集翻译插件
    容器中的nginx暴露一个端口部署多个功能的网站
  • 原文地址:https://blog.csdn.net/z318913/article/details/126819376