• 深入浅出Java多线程(十二):线程池


    引言


    大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第十二篇内容:线程池。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!!

    在现代软件开发中,多线程编程已经成为应对高并发、高性能场景的必备技术。随着计算机硬件的发展,尤其是多核CPU的普及,利用多线程能够充分利用系统资源,提升程序执行效率和响应速度。然而,在直接使用原生线程创建与销毁的过程中,我们往往会遇到一些难以忽视的问题:

    首先,线程的创建和销毁并非无成本操作。操作系统需要分配内存空间给线程栈,以及为线程调度维护上下文切换等信息,频繁地创建和销毁线程会导致系统资源被大量消耗。尤其在处理短生命周期任务时,这种开销可能远大于实际业务逻辑执行的耗时。

    其次,过多的并发线程可能会引发资源竞争问题,导致服务器过载甚至崩溃。当并发数量不受控制时,系统内存、CPU资源乃至文件句柄等关键资源都可能因过度消耗而达到瓶颈,从而影响整个系统的稳定性与性能。

    再者,对线程进行分散管理会增加代码复杂度和出错风险。没有一个统一管理和协调的机制,程序员很难准确预测和控制多线程间的交互行为,例如同步问题、死锁现象及资源争抢等问题,这些问题都会降低程序的质量和可维护性。

    因此,Java提供了强大的线程池机制,通过Executor接口及其核心实现类ThreadPoolExecutor来解决上述挑战。线程池能有效地复用已存在的线程,避免了频繁创建和销毁线程带来的开销;同时,它允许我们预设并动态调整线程池大小以控制并发执行的任务数,确保系统资源合理利用而不至于过载。此外,线程池还能对线程进行统一管理和异常处理,简化了多线程编程的复杂性。

    例如,我们可以直观地通过Java代码实例展示线程池的优势:

    1
    2
    3
    4
    5
    6
    7
    8
    ExecutorService executor = Executors.newFixedThreadPool(10); // 创建固定大小的线程池
    
    for (int i = 0; i < 1000; i++) {
        Runnable task = new Task(i); // 假设有Task是一个实现了Runnable接口的任务类
        executor.execute(task); // 将任务提交到线程池中执行
    }
    
    executor.shutdown(); // 当所有任务提交完毕后,关闭线程池,等待所有任务执行完成
    

    通过上述代码片段可以看到,线程池负责管理这些待执行的任务,并根据预先设定的核心线程数来高效地调度执行,极大地提升了编程效率和系统的运行效能。接下来的文章将深入剖析Java多线程之线程池原理,从线程池构造方法参数的意义,到其内部状态机设计和任务处理流程,再到线程复用的具体实现细节,全面揭示这一重要组件的工作机制和应用场景。

    为什么要使用线程池


    在多线程编程中,使用线程池(Thread Pool)是提高程序并发处理能力和资源利用率的关键技术。以下是采用线程池的三个主要原因:

    减少系统资源消耗 创建和销毁线程是一项昂贵的操作,涉及到内存分配、上下文切换等系统资源的大量消耗。频繁创建和销毁线程可能导致性能瓶颈。线程池通过预先创建并维护一定数量的线程来复用这些线程资源,当有新任务提交时直接将任务分配给空闲线程执行,从而避免了频繁创建线程的成本。例如,在Java中,通过ExecutorService接口及其实现类ThreadPoolExecutor可以方便地创建一个线程池,并利用其管理线程生命周期,如下所示:

    ExecutorService executor = Executors.newFixedThreadPool(5); // 创建包含5个核心线程的线程池
    

    控制并发数量以防止服务器过载 在高并发场景下,如果不加限制地创建线程,可能会导致并发数量过多,超出系统承受能力,引发如内存溢出、CPU使用率过高甚至服务器崩溃等问题。线程池通过设置核心线程数(corePoolSize)与最大线程数(maximumPoolSize),能够动态调整并发执行的任务数,确保系统的稳定性和资源的有效利用。比如,当核心线程已满负荷工作时,非核心线程会在任务队列饱和后才开始创建,且一旦超过最大线程数,线程池会根据配置的拒绝策略对新提交的任务进行合理处理。

    便于统一管理和维护线程 线程池提供了统一的线程管理和异常处理机制,使得程序员无需关注每个线程的具体创建和销毁过程,简化了代码逻辑。线程池还可以为线程设置优先级、命名以及自定义线程工厂等特性,进一步增强了线程管理的灵活性和可定制性。此外,线程池还支持任务完成后的回调函数,如beforeExecute()afterExecute()方法,用于执行特定的前后置操作,提升了程序的整体可控性和健壮性。

    通过使用线程池,我们可以更有效地组织并发执行的任务,降低开发难度,同时提高了系统的响应速度和资源使用效率。以下是一个简单的示例,展示了如何利用线程池执行多个耗时任务并控制并发数量:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    class MyTask implements Runnable {
        private int taskId;
    
        public MyTask(int id) {
            this.taskId = id;
        }
    
        @Override
        public void run() {
            System.out.println("Task " + taskId + " is running in thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 使用线程池执行多个任务
    ExecutorService executor = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 10; i++) {
        executor.execute(new MyTask(i));
    }
    executor.shutdown();
    
    

    在这个例子中,即使有10个任务需要执行,但由于线程池大小被限制为5,所以最多只有5个任务会同时被执行,有效避免了并发数量过大带来的潜在问题。同时,当所有任务完成后,调用shutdown()方法优雅关闭线程池,确保资源得到释放。

    线程池接口与实现


    在Java中,线程池的实现基于java.util.concurrent包中的Executor接口及其扩展接口。其中,ThreadPoolExecutor作为核心实现类,提供了丰富的配置选项和灵活的任务调度机制。

    Java Executor 接口

    Executor接口定义了一个统一的方法execute(Runnable command),用于执行提交给它的Runnable任务,简化了线程创建和管理的过程。通过实现这个接口,可以创建具有不同策略的线程池,例如:

    1
    2
    3
    4
    5
    6
    7
    8
    Executor executor = Executors.newFixedThreadPool(10); // 创建固定大小线程池
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 业务逻辑代码
        }
    });
    
    

    ThreadPoolExecutor 类

    构造方法详解 ThreadPoolExecutor提供了一系列构造函数,允许开发者自定义线程池的核心参数。主要包含以下五个基本参数:

    • corePoolSize: 核心线程数,即使没有任务处理时也会保留在线程池中的线程数量。
    • maximumPoolSize: 线程池最大容量,当工作队列满载且仍有新任务到来时,线程池将尝试增加到此值。
    • keepAliveTime: 非核心线程空闲超时时长,在指定时间内无新任务分配给非核心线程,则会销毁这些线程。
    • unit: keepAliveTime的时间单位,如秒、毫秒等。
    • workQueue: 用于存储待执行任务的阻塞队列,类型可选为LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue或DelayQueue等。

    例如:

    1
    2
    3
    BlockingQueue queue = new LinkedBlockingQueue<>();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4860L, TimeUnit.SECONDS, queue);
    
    

    此处创建了一个初始核心线程数为4、最大线程数为8的线程池,闲置非核心线程超过60秒会被回收,并使用链式阻塞队列来存放任务。

    此外,还有两个额外的可选参数:

    • threadFactory: 定义线程工厂,用于批量创建线程并设置其属性(如守护线程、优先级等)。默认采用DefaultThreadFactory,可以根据需求自定义实现。
    • handler: 拒绝策略,当线程池及任务队列饱和时,无法接收新的任务时所采取的动作,默认为AbortPolicy,即抛出RejectedExecutionException异常。还可以选择DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃最早进入队列的任务以腾出空间)或CallerRunsPolicy(由提交任务的线程自行执行该任务)。

    线程池执行流程

    一个典型的线程池实例化示例是创建一个仅有一个核心线程的线程池,确保所有任务按顺序执行,不创建非核心线程:

    1
    2
    3
    4
    5
    6
    7
    8
    ExecutorService executor = new ThreadPoolExecutor(
        1,  // corePoolSize
        1,  // maximumPoolSize
        0L// keepAliveTime
        TimeUnit.MILLISECONDS, // unit
        new LinkedBlockingQueue<>() // workQueue
    );
    
    

    此线程池不会因为线程数量不足而创建额外的非核心线程,所有提交的任务都会按照FIFO原则添加到队列中等待唯一的核心线程执行。

    总之,Java中的线程池接口与其实现(尤其是ThreadPoolExecutor)为开发者提供了强大的并发编程工具,允许我们根据应用场景灵活调整线程资源的分配和管理策略,从而有效地提升程序性能和系统稳定性。通过深入理解其内部构造原理和运行机制,我们可以更好地设计和优化多线程应用。

    线程池状态


    在Java的多线程编程中,线程池的状态与生命周期管理是其核心功能之一。ThreadPoolExecutor类通过维护一个volatile int类型的变量runState来表示线程池的状态,该状态包括RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED五个阶段。

    • RUNNING:线程池创建后默认处于此状态,能够接受新任务并处理阻塞队列中的任务。
    • SHUTDOWN:调用shutdown()方法后进入此状态,不再接受新的提交任务,但会继续处理已加入队列的任务直至执行完毕。
    1
    2
    3
    4
    ExecutorService executor = Executors.newFixedThreadPool(5);
    // ... 执行一系列任务
    executor.shutdown();
    
    
    • STOP:调用shutdownNow()方法后变为STOP状态,不仅不接收新任务,还会尝试中断正在执行的任务,并且不会处理尚未开始执行的任务。
    1
    2
    executor.shutdownNow(); // 立即停止所有正在运行的任务并拒绝后续任务
    
    
    • TIDYING:当所有的任务都已经终止并且workerCount(活动工作线程数)为0时,线程池将转换到TIDYING状态,并触发terminated()钩子方法。
    • TERMINATEDterminated()方法执行完毕后,线程池最终进入TERMINATED状态,表明线程池已经彻底关闭,无法再进行任何操作。

    线程池状态的变化过程遵循严格的条件判断和转换逻辑,例如在任务执行流程中,添加任务或销毁线程时都会检查当前的runState。此外,线程池还通过ctl变量合并了workerCount(工作线程数量)和runState的信息,以原子方式更新线程池的整体状态。

    下面是一个简单的示例,演示了如何监控线程池的状态变化:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class ThreadPoolStatusMonitor {
        private final ThreadPoolExecutor executor;
    
        public ThreadPoolStatusMonitor(ThreadPoolExecutor executor) {
            this.executor = executor;
            executor.addThreadFactory(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("MonitoringThread");
                    return thread;
                }
            });
    
            ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
            monitor.scheduleAtFixedRate(() -> {
                System.out.println("Current pool status: " + getStateString(executor));
                if (executor.isTerminated()) {
                    monitor.shutdown();
                }
            }, 11, TimeUnit.SECONDS);
        }
    
        private String getStateString(ThreadPoolExecutor executor) {
            switch (executor.getRunState()) {
                case RUNNING:
                    return "RUNNING";
                case SHUTDOWN:
                    return "SHUTDOWN";
                case STOP:
                    return "STOP";
                case TIDYING:
                    return "TIDYING";
                case TERMINATED:
                    return "TERMINATED";
                default:
                    return "UNKNOWN";
            }
        }
    }
    
    // 使用示例:
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2460L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    new ThreadPoolStatusMonitor(executor);
    
    // 添加一些任务...
    executor.execute(() -> { /* 业务逻辑 */ });
    
    // 后续调用 shutdown 或 shutdownNow 方法
    executor.shutdown();
    
    

    通过上述代码片段可以看到,我们创建了一个线程池状态监测器,定期打印线程池状态,并在检测到线程池终止后自动停止监测线程。这样可以直观地观察到线程池从创建到最终关闭整个生命周期内的状态变化情况。

    线程池处理流程


    线程池任务处理流程是ThreadPoolExecutor类的核心功能,其主要通过execute(Runnable command)方法实现。下面我们将深入剖析该方法内部的任务调度逻辑。

    创建核心线程执行任务(corePoolSize) 当调用execute()方法提交任务时,首先检查当前活跃线程数是否小于核心线程数(corePoolSize)。如果是,则直接创建新的核心线程来执行这个新任务。核心线程在没有任务可执行时不会被销毁,除非设置了允许核心线程超时的选项。

    1
    2
    3
    4
    5
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // 核心线程,并尝试添加到工作队列中
            return;
    }
    
    

    将任务添加到任务队列(workQueue) 如果当前活跃线程数不小于核心线程数,接下来会尝试将任务放入阻塞队列(workQueue)中等待空闲的核心线程去执行。在这个阶段,会进行两次线程池状态检查:一次是在入队前,另一次是在成功入队后。这是因为在多线程环境下,线程池的状态可能会随时发生变化,因此需要二次检查以确保任务能够在正确状态下被执行。

    1
    2
    3
    4
    5
    6
    7
    8
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) // 检查状态并移除任务
            reject(command); // 执行拒绝策略
        else if (workerCountOf(recheck) == 0// 如果此时没有活动线程则创建非核心线程
            addWorker(nullfalse);
    }
    
    

    创建非核心线程执行任务(maximumPoolSize) 若任务无法放入任务队列,这可能是因为队列已满或线程池配置不允许放入更多任务。在这种情况下,线程池试图创建非核心线程来执行任务,但仅在总线程数未达到最大值(maximumPoolSize)的情况下才创建。

    1
    2
    3
    else if (!addWorker(command, false)) // 创建非核心线程执行任务
        reject(command); // 若创建失败则执行拒绝策略
    
    

    拒绝策略 当线程池无法再接受新任务时(例如超过最大线程数且任务队列已满),则触发拒绝策略。Java提供了四种预定义的拒绝策略:

    • AbortPolicy:默认策略,抛出RejectedExecutionException异常。
    • DiscardPolicy:默默地丢弃任务,不做任何处理。
    • DiscardOldestPolicy:丢弃队列中最旧的任务(即最先加入队列的任务),然后重新尝试执行新任务。
    • CallerRunsPolicy:由调用线程执行被拒绝的任务。

    总结整个处理流程

    1. 当线程数量不足corePoolSize时,优先创建核心线程执行任务。
    2. 线程数量满足corePoolSize时,将任务加入workQueue等待执行。
    3. workQueue已满且线程数量未达maximumPoolSize时,创建非核心线程执行任务。
    4. 若所有条件均无法接纳新任务,则根据设定的拒绝策略处理被拒绝的任务。

    以下是一个简化的示例代码,展示如何使用线程池执行任务:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2// 核心线程数
            5// 最大线程数
            60L// 空闲线程存活时间
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>() // 使用无界链式阻塞队列
        );
    
        for (int i = 0; i < 10; i++) {
            Runnable task = () -> System.out.println("Task " + Thread.currentThread().getName() + " is running");
            executor.execute(task);
        }
    
        executor.shutdown(); // 提交完所有任务后关闭线程池
    }
    
    

    这段代码创建了一个线程池,并提交了10个任务,根据上述任务处理流程,线程池会按照合适的方式安排这些任务的执行。

    线程复用机制原理


    线程复用机制是Java线程池实现高效并发处理的核心技术之一,其主要通过ThreadPoolExecutor类中的Worker工作线程类来完成。Worker不仅实现了Runnable接口,还是一个封装了线程和任务队列交互的实体。

    在创建线程池时,首先会创建一定数量的核心线程(corePoolSize),这些线程会一直存活在线程池中,即使没有任务执行,除非设置了允许核心线程超时策略。当有新任务提交到线程池时,首先尝试将任务分配给这些核心线程。如果所有核心线程都在忙碌,且任务队列非空,则新提交的任务会被放入阻塞队列等待执行。

    Worker类的构造函数初始化了一个与之关联的Thread对象,并将其自身作为该线程的任务,即当调用t.start()启动这个线程时,实际执行的是Worker.run()方法。在run()方法中,Worker会不断地从阻塞队列中获取新的任务来执行,这个过程如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    final void runWorker(Worker w) {
        // 获取当前运行的线程以及初始任务
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
    
        // 清除firstTask并解锁,以便执行后续任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
    
        try {
            // 无限循环,直到线程池停止或worker退出
            while (task != null || (task = getTask()) != null) {
                // 加锁并检查线程池状态,若已关闭则中断线程
                w.lock();
                // ... 状态判断及中断操作
    
                try {
                    // 执行前置钩子方法
                    beforeExecute(wt, task);
    
                    // 执行任务
                    task.run();
    
                    // 执行后置钩子方法
                    afterExecute(task, null);
                } catch (...) { ... }
    
                // 更新已完成任务计数并解锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        } finally {
            // 工作线程退出时进行资源清理
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    

    getTask()方法负责从阻塞队列中取出下一个待执行的任务。根据线程池配置,核心线程会使用workQueue.take()方法阻塞等待新任务;而非核心线程在keepAliveTime内未获得新任务时,会调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法尝试获取,超时后线程可能会被销毁。

    下面是一个简化的示例代码片段,展示了如何利用线程池执行任务并实现线程复用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5// 核心线程数
            10// 最大线程数
            60L// 空闲线程存活时间
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>() // 阻塞队列
        );
    
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Task " + taskId + " running in thread: " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(1000); // 模拟耗时任务
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
    
        executor.shutdown(); // 提交完所有任务后关闭线程池
    }
    
    

    在这个例子中,线程池会根据需要创建并复用线程,每个任务都由线程池中的一个线程执行,任务完成后线程并不会立即销毁,而是继续从队列中获取下一个任务执行,从而达到复用的目的。同时,线程池内部管理确保了线程生命周期的合理控制,避免了频繁创建和销毁线程带来的开销。

    总结


    Java线程池原理的核心在于ThreadPoolExecutor类的实现,它通过合理管理线程生命周期、任务队列以及线程池状态来高效地执行并发任务。线程池利用核心线程和非核心线程复用机制,有效降低了系统资源消耗,控制了并发数量,并简化了线程的统一管理和异常处理。

    首先,线程池通过构造方法设置参数如corePoolSize(核心线程数)、maximumPoolSize(最大线程数)和keepAliveTime(空闲线程存活时间),以及选择合适的阻塞队列workQueue(例如LinkedBlockingQueue、ArrayBlockingQueue或SynchronousQueue等)。通过这些参数,开发者可以根据应用需求灵活调整线程池的行为特性。

    在处理任务时,execute()方法作为入口点,根据当前线程池状态和线程数决定如何调度任务:优先使用核心线程执行任务,当核心线程已满载时将任务放入阻塞队列;若阻塞队列也已满且线程总数未达到最大值,则创建非核心线程执行新任务;若超过最大线程数则采用预定义的拒绝策略(AbortPolicy、DiscardPolicy、DiscardOldestPolicy或CallerRunsPolicy)。

    线程复用的关键在于Worker类的设计。每个Worker对象封装了一个Thread实例并实现了Runnable接口,其run()方法会持续从工作队列中获取任务并执行,实现了线程在完成一个任务后能够立即投入下一个任务的执行,从而避免了频繁创建和销毁线程带来的开销。

    此外,线程池的状态机设计至关重要,包含RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED五个状态,分别对应不同的行为模式。例如,调用shutdown()方法后,线程池进入SHUTDOWN状态,不再接受新提交的任务但继续执行已在队列中的任务,直至所有任务执行完毕并通过terminated()方法转换为TERMINATED状态。

    总之,在多线程编程中,Java线程池为我们提供了一种强大而灵活的工具,通过合理配置和管理线程池,不仅能有效提升程序性能,还能确保系统的稳定性和资源的有效利用。实际开发中,应当根据业务需求定制化线程池参数,并充分理解线程池的工作原理与任务调度逻辑,以便于编写出高并发、低资源占用的健壮代码。

    示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 创建固定大小的线程池,核心线程数等于最大线程数,无界任务队列
    ExecutorService executor = Executors.newFixedThreadPool(5);
    
    for (int i = 0; i < 10; i++) {
        final int taskId = i;
        Runnable task = () -> System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
        executor.execute(task);
    }
    
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // 等待所有任务完成
    
    // 示例展示了线程池如何接收多个任务并分配给线程执行,最终关闭线程池并确保所有任务都已完成。
    
    

    这段代码展示了如何创建一个固定大小的线程池,并提交多个任务到线程池进行异步执行。通过调用awaitTermination()方法,主程序可以等待所有任务完成后再结束运行,确保了任务的正确完成和线程池的有序关闭。

    本文使用 markdown.com.cn 排版

  • 相关阅读:
    【云计算与数据中心规划】【期末复习题】
    unity学习笔记-有关打包到安卓开机黑屏时间过长的心得
    Zookeeper
    知识图谱从入门到应用——知识图谱的知识表示:符号表示方法
    java-net-php-python-springboot班级信息系统计算机毕业设计程序
    嵌入式系统程序架构的进化
    不同温度与工况的放电曲线与内阻曲线
    JZ11 旋转数组的最小数字
    移动端应用(APP)如何设计测试用例?
    Redis缓存数据库
  • 原文地址:https://www.cnblogs.com/CoderLvJie/p/18070458