• 学习线程池原理从手写一个线程池开始


    概述

    线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。

    线程池框架设计

    我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。

    因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:

    • 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
    • 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
    • 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

    线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。

    为了实现线程池功能,需要考虑下面几个设计要点:

    1. 线程池可以接口外部提交的任务执行
    2. 线程池有工作线程的数量,有任务执行,没有任务也空闲在那,等待任务过来,这样既避免线程频繁创建销毁带来的开销,同时也可以避免线程池无限制的创建线程
    3. 如果线程池接受提交的任务超过工作线程的数量了,该怎么办?可以用一个队列把任务存下来,等工作线程完成任务后去队列中获取任务,执行
    4. 那如果任务实在是太多太多了,达到了我们认为的队列最大值,怎么办,我们可以设计一种任务太多的策略,可以进行切换,比如直接丢弃任务、报错等等

    看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型

    • 阻塞队列存储执行任务,比如外部main函数作为生产者向队列生产任务。
    • 线程池中的工作线程作为消费者获取任务执行。

    现在我们将我们的设计思路转换为代码。

    代码实现

    阻塞队列的实现

    • 阻塞队列主要存放任务,有容量限制
    • 阻塞队列提供添加和删除任务的API, 如果超过容量,阻塞不能添加任务,如果没有任务,阻塞无法获取任务。
    /**
     * 

    自定义任务队列, 用来存放任务

    * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 10:15 * @version: 1.0.0 */ @Slf4j(topic = "c.BlockingQueue") public class BlockingQueue { // 容量 private int capcity; // 双端任务队列容器 private Deque deque = new ArrayDeque<>(); // 重入锁 private ReentrantLock lock = new ReentrantLock(); // 生产者条件变量 private Condition fullWaitSet = lock.newCondition(); // 生产者条件变量 private Condition emptyWaitSet = lock.newCondition(); public BlockingQueue(int capcity) { this.capcity = capcity; } // 阻塞的方式添加任务 public void put(T task) { lock.lock(); try { // 通过while的方式 while (deque.size() >= capcity) { log.debug("wait to add queue"); try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } deque.offer(task); log.debug("task add successfully"); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 阻塞获取任务 public T take() { lock.lock(); try { // 通过while的方式 while (deque.isEmpty()) { try { log.debug("wait to take task"); emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); T task = deque.poll(); log.debug("take task successfully"); // 从队列中获取元素 return task; } finally { lock.unlock(); } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • put()方法是向阻塞队列中添加任务
    • take()方法是向阻塞队列中获取任务

    线程池消费端实现

    1. 定义执行器接口
    /**
     * 

    定义一个执行器的接口:

    * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 12:31 * @version: 1.0.0 */ public interface Executor { /** * 提交任务执行 * @param task 任务 */ void execute(Runnable task); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    1. 定义线程池类实现该接口
    @Slf4j(topic = "c.ThreadPool")
    public class ThreadPool implements Executor {
    
        /**
         * 任务队列
         */
        private BlockingQueue taskQueue;
    
        /**
         * 核心工作线程数
         */
        private int coreSize;
    
        /**
         * 工作线程集合
         */
        private Set workers = new HashSet<>();
    
        /**
         *  创建线程池
         * @param coreSize 工作线程数量
         * @param capcity 阻塞队列容量
         */
        public ThreadPool(int coreSize, int capcity) {
            this.coreSize = coreSize;
            this.taskQueue = new BlockingQueue<>(capcity);
        }
    
        /**
         * 提交任务执行
         */
        @Override
        public void execute(Runnable task) {
            synchronized (workers) {
                // 如果工作线程数小于阈值,直接开始任务执行
                if(workers.size() < coreSize) {
                    Worker worker = new Worker(task);
                    workers.add(worker);
                    worker.start();
                } else {
                    // 如果超过了阈值,加入到队列中
                    taskQueue.put(task);
                }
            }
        }
    
        /**
         * 工作线程,对执行的任务做了一层包装处理
         */
        class Worker extends Thread {
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                // 如果任务不为空,或者可以从队列中获取任务
                while (task != null || (task = taskQueue.take()) != null) {
                    try {
                        task.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 执行完后,设置任务为空
                        task = null;
                    }
                }
    
                  // 移除工作线程
                synchronized (workers){
                    log.debug("remove worker successfully");
                    workers.remove(this);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • Worker类是工作线程类,包装了执行任务,里面实现了从队列获取任务,然后执行任务。
    • execute方法的实现中,如果工作线程数量小于阈值的话,直接创建新的工作线程,否则将任务添加到队列中。
    1. 演示
    @Test
        public void testThreadPool1() throws InterruptedException {
            Executor executor = new ThreadPool(2, 4);
            // 提交任务
            for (int i = 0; i < 6; i++) {
                final  int j = i;
                executor.execute(() -> {
                    try {
                        Thread.sleep(10);
                        log.info("run task {}", j);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                Thread.sleep(10);
            }
    
            Thread.sleep(10000);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    运行结果:

    获取任务超时设计

    目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。

    @Slf4j(topic = "c.TimeoutBlockingQueue")
    public class TimeoutBlockingQueue {
        // 容量
        private int capcity;
        // 双端任务队列容器
        private Deque deque = new ArrayDeque<>();
        // 重入锁
        private ReentrantLock lock = new ReentrantLock();
        // 生产者条件变量
        private Condition fullWaitSet = lock.newCondition();
        // 生产者条件变量
        private Condition emptyWaitSet = lock.newCondition();
    
        public TimeoutBlockingQueue(int capcity) {
            this.capcity = capcity;
        }
    
        // 带超时时间的获取
        public T poll(long timeout, TimeUnit unit){
            lock.lock();
            try{
                // 将 timeout 统一转换为 纳秒
                long nanos = unit.toNanos(timeout);
                while (deque.isEmpty()){
                    try {
                        if (nanos<=0){
                            return null;
                        }
                        // 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                fullWaitSet.signal();
                return deque.getFirst();
            }finally {
                lock.unlock();
            }
        }
    
        // 带超时时间的增加
        public boolean offer(T task , long timeout , TimeUnit unit){
            lock.lock();
            try{
                // 将 timeout 统一转换为 纳秒
                long nanos = unit.toNanos(timeout);
                while (deque.size() == capcity){
                    try {
                        if (nanos<=0){
                            return false;
                        }
                        // 更新剩余需要等待的时间
                        nanos = fullWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("加入任务队列 {}", task);
                deque.addLast(task);
                emptyWaitSet.signal();
                return true;
            }finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。

    拒绝策略设计

    目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。

    1. 定义策略模式的函数式接口
    /**
     * 

    拒绝策略的函数式接口:

    * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 13:15 * @version: 1.0.0 */ @FunctionalInterface public interface RejectPolicy { /** * 拒绝策略的接口 * @param queue * @param task */ void reject(BlockingQueue queue, T task); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 添加函数式接口的调用入口

    我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。

    @Slf4j(topic = "c.BlockingQueue")
    public class BlockingQueue {
        ........
    
        /**
         * 尝试添加任务
         * @param rejectPolicy
         * @param task
         */
        public void tryPut(RejectPolicy rejectPolicy, T task) {
            lock.lock();
            try{
                // 如果队列超过容量
                if (deque.size()> capcity){
                    log.debug("task too much, do reject");
                    rejectPolicy.reject(this, task);
                }else {
                    deque.offer(task);
                    emptyWaitSet.signal();
                }
            }finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 修改ThreadPool类
    @Slf4j(topic = "c.ThreadPool")
    public class ThreadPool implements Executor {
        .....
    
        /**
         * 拒绝策略
         */
        private RejectPolicy rejectPolicy;
    
        // 通过构造方法传入执行的拒绝策略
        public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
            this.coreSize = coreSize;
            this.taskQueue = new BlockingQueue<>(capcity);
            this.rejectPolicy = rejectPolicy;
        }
    
        /**
         * 提交任务执行
         */
        @Override
        public void execute(Runnable task) {
            synchronized (workers) {
                // 如果工作线程数小于阈值,直接开始任务执行
                if(workers.size() < coreSize) {
                    Worker worker = new Worker(task);
                    workers.add(worker);
                    worker.start();
                } else {
                    // 如果超过了阈值,加入到队列中
                    //taskQueue.put(task);
    
                    // 调用tryPut的方式
                    taskQueue.tryPut(rejectPolicy, task);
                }
            }
        }
    
       ....
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 通过构造方法的方式传入要执行的拒绝策略
    • 调用tryPut方法添加任务
    1. 演示

    总结

    jdk中的线程池实现远比这里手写的要复杂,里面还涉及救急线程、各种内置的拒绝策略以及不同的队列容器等等,但是他们的思想基本一致的,通过这个练习,在后面阅读线程池源码的时候会有很大的帮助。

  • 相关阅读:
    技术引领未来, IDC TechScape中国数据安全发展路线图首发
    【数据结构-队列】阻塞队列
    一文1800字从0到1使用Python Flask实战构建Web应用
    CondeseNetV2:清华与华为出品,保持特征的新鲜是特征复用的关键 | CVPR 2021
    Python程序化生成三维场景【PyPRT】
    计算机网络期末复习(1)计算机网络在信息时代对的作用 计算机网络的定义和分类 三种交换方法
    力扣(LeetCode)算法_C++——两个列表的最小索引总和
    Ketlle数据采集和预处理工具的认知和基本应用
    Golang 依赖注入设计哲学|12.6K 的依赖注入库 wire
    异行星低代码平台--第三方插件对接:企业微信平台对接(二)
  • 原文地址:https://blog.csdn.net/Huangjiazhen711/article/details/127405770