• 线程池的实现原理与使用详解


    几乎所有需要异步或并发执行任务的程序都可以使用线程池,在开发过程中,合理地使用线程池能够带来3个好处。

    • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度:当任务到达时,任务可以需要等到线程创建就能立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

    线程池的实现原理

    当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?我们来看一下线程池的主要处理流程:

    1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
    2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
    3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

    image

    线程池主要处理流程

    image

    ThreadPoolExecutor执行示意图

    ThreadPoolExecutor执行execute方法分下面4种情况:

    1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
    2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
    3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
    4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutuionHandler.rejectedExecution()方法。

    源码分析

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //如果线程数小于基本线程数,则创建线程并执行当前任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //如果线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
            //抛出rejectedExecutionException异常
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    工作线程
    线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        //任务执行
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    image

    ThreadPoolExecutor执行任务示意图

    线程池中的线程执行任务分两种情况

    1. 在execute()方法中创建一个线程时,会让这个线程执行当前任务。
    2. 这个线程执行完上图1的任务后,会反复从BlockingQueue获取任务来执行。

    线程池的使用

    线程池的创建

    我们可以通过ThreadPoolExecutor来创建一个线程池。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    输入参数如下:

    1. corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
    2. workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
      • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
      • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
      • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
    3. maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列,这个参数就没什么效果。
  • 相关阅读:
    数字秒表VHDL启动暂停清零,源码和视频
    【外卖项目实战开发三】
    Codesys数据类型(2.7):扩展数据类型之 别名 详解
    C语言 数组——排序算法的函数实现
    Docker--Harbor私有仓库部署与管理
    神经网络拟合图像:Implicit Neural Representations with Periodic Activation Functions
    OceanBase V4.2特性解析:用 Show Trace 快速定位数据库性能瓶颈
    C#重启 --- 类和对象
    【开发教程9】疯壳·开源蓝牙心率防水运动手环-心率监测
    新客服系统_防黑加固/多商户/网页H5/公众号/小程序/即时通讯/自适应在线客服系统源码(附教程)
  • 原文地址:https://blog.csdn.net/wozaibohaibian/article/details/125531785