• 1 线程池-手写线程池


    线程池系列文章

    1 手写线程池
    2 ThreadPoolExecutor分析
    3 线程池扩展

    线程池原理

    线程池的思想依据就是常见的:池化思想。即:线程的创建和销毁是重操作(内存分配和销毁),因此考虑通过池化技术,实现线程池的复用。

    线程池运行流程图

    在这里插入图片描述

    手写线程池

    创建线程

    创建线程的常用方法有两种,第一种:继承Thread,重写run方法;第二种:创建线程时,指定Runnable接口作为构造参数。常用第二种方法。

    new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程启动");
                }
            }).start();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如果Thread可以setRunable,并且多次start,这样从runnable队列取runnable,set,start,就可以实现一个线程执行多个runnable,这可以看做从Thread外部解决问题。但是不支持,那就要从Thread内部解决。

    简单线程池

    外部处理Thread无法实现一个线程执行多个Runnable,从线程内部考虑,因为线程最终执行的是run方法,那么可以考虑run方法中死循环,循环内部从队列中获取Runnable,这样也实现一个Thread,执行多个Runnable。示例:
    Run方法中循环从任务列表中获取任务,并执行,就是一个非常简单的线程池,只不过比较特殊。

    List<Runnable> TASKS = new ArrayList();
    new Thread(new Runnable() {
                @Override
                public void run() {
                	//run方法循环调用任务列表,直到任务列表为空,这样也实现了线程的复用。
                    while (true) {
                        //获取任务
                        Runnable runnable = TASKS.get(0);
                        if (runnable != null) {                  
                            //获取任务
                            Runnable runnable = TASKS.get(0);
                            runnable.run();                           
                        } else {
                        	System.out.printf("超出等待时间,线程退出:%s", System.currentTimeMillis());
                            return; 
                        	
                        }
                    }
                }
            }).start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    上述就是简单的线程池,只有一个线程,并且从队列获取任务没有考虑先关的并发性等,后面一步步的优化。

    添加阻塞队列

    前面的简单线程池,未考虑多线程问题,主要场景有两种:

    1. 业务线程提交任务到队列。
    2. 多线程从任务队列中获取任务。
      为了解决多线程的问题,添加一个带锁的队列,例如:ArrayBloockingQueue,LinkedBloockingQueue等。
    Queue<Runnable> TASKS=new ArrayBlockingQueue(100);
    public void run() {
                    while (true) {
                       	Runnable runnable = TASKS.take();
                        if (runnable != null) {                  
                            //获取任务
                            Runnable runnable = TASKS.get(0);
                            runnable.run();                           
                        } else {
                        	System.out.printf("超出等待时间,线程退出:%s", System.currentTimeMillis());
                            return; 
                        	
                        }
                    }
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    上述代码可以进行优化一下:
    优化后:

    public static void main(String[] args) {
    
            new Thread(new Runnable() {
                @Override
                public void run() {
               		Runnable task;
                    while ((task =getTask())!=null) {
                        task.run();                    
                    }
                    //task==null时,线程就会退出。
                }
            }).start();
        }
    
    	//获取任务,一直阻塞直到获取任务,暂时没有解决线程退出问题
        private static Runnable getTask() {
            try {
                return TASKS.take();
            } catch (InterruptedException e) {
                return null;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    getTask有个缺陷take会一直阻塞到获取任务,那么当线程池空闲时,线程没有机会退出。

    多个线程

    通过Set集合,存储创建的线程。此处就不展示代码。

    线程退出

    getTask是一直阻塞,直到获取任务,因此可以考虑等待一段时间,没有获取任务后,自动唤醒,这样解决了线程退出的问题,但是却导致了其他问题。具体见:线程约定退出。

    private static Runnable getTask() {
            try {
                //设置了等待超时时间,这样线程就可以退出了
                return TASKS.poll(THREAD_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS);            
            } catch (InterruptedException e) {
                //被打断,也未获取到任务
                return null;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    线程条件退出

    上述的线程退出有个致命的缺陷,当任务队列为空,也没有新的任务提交,那么getTask超时未获取任务,返回null,线程就会退出;当没有任务时,所有的线程都退出了,与我们使用线程池的初衷又相违背了。因此即使没有获取任务,也不能全部退出,只有符合约定条件后,线程才允许退出。主要的条件如下:
    1 线程池关闭。2 当前线程数大于最大线程数。3 当前线程数大于核心线程数,并且队列为空。

    private Runnable getTask() {
            //防止未获取任务,线程全部都退出,通过for循环,未获取任务后,判断是否符合退出的约定条件
            for (; ; ) {
                int curState = state.get();
                //线程池已经暂停
                if (curState >= STOP) {
                    //线程池已停止,任务退出,尝试扣除线程总数,利用for循环,实现cas操作
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                //当前线程数,超过最大线程数,可以退出
                //当前线程数,超过核心线程数,并且队列为空,可以退出。
                int curTotal = total.get();
                if (curTotal > maxThreadSize
                        || (curTotal > coreThreadSize && TASKS.isEmpty())) {
                	//尝试扣减线程数
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                try {
                    Runnable task = TASKS.poll(THREAD_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS);      
                    if (task == null)
                        //未获取任务,for循环判断退出条件是否满足
                        continue;
                    return task;
                } catch (InterruptedException e) {
                    //被打断,不返回null,for循环判断退出条件是否满足
                }
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    主要逻辑:

    1. 通过for循环,防止所有线程全部退出。
    2. 通过条件判断,如果允许退出,扣减线程数量成功后,返回null,让线程退出。
    3. 上述逻辑还可以优化,主要是队列为空的判断,TASKS.empty()涉及加锁等操作,task为null时continue,其实task为null就已经表明队列为空了。因此可以优化为:
    private Runnable getTask() {
    		//超时未获取任务
    		boolean timeout=false;
            //防止未获取任务,线程全部都退出,通过for循环,未获取任务后,判断是否符合退出的约定条
            for (; ; ) {
                int curState = state.get();
                //线程池已经暂停
                if (curState >= STOP) {
                    //线程池已停止,任务退出,尝试扣除线程总数,利用for循环,实现cas操作
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                //当前线程数,超过最大线程数,可以退出
                //当前线程数,超过核心线程数,并且队列为空,可以退出。
                int curTotal = total.get();
                if (curTotal > maxThreadSize
                        || (curTotal > coreThreadSize && timeout)) {
                	//尝试扣减线程数
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                try {
                    Runnable task = TASKS.poll(THREAD_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS);      
                    if (task != null)              
                    	return task;
                    //未获取任务,for循环判断退出条件是否满足
                    timeout=true;   
                } catch (InterruptedException e) {
                    //被打断,不返回null,for循环判断退出条件是否满足
                    timeout=false;
                }
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    1. 上述getTask还是由一定的问题,TASKS.poll(THREAD_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS); 当线程池空闲时,线程数量=核心线程数后,线程就会处于:阻塞(队列获取任务)->唤醒(等待超时)->判断退出条件(不成立)->阻塞(队列获取任务)->唤醒(等待超时)的循环中。因此当空闲时间,希望线程直接阻塞,而BlockingQueue中的take方法:阻塞直到获取任务。优化之后的代码
    private Runnable getTask() {
           
            boolean timeout=false;
             //防止未获取任务,线程全部都退出,通过for循环,未获取任务后,判断是否符合退出的约定条件
            for (; ; ) {
                int curState = state.get();
                //线程池已经暂停
                if (curState >= STOP) {
                    //线程池已停止,任务退出,尝试扣除线程总数,利用for循环,实现cas操作
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                //当前线程大于核心线程数,那么线程需要一个退出机会
                boolean needExitChance = curTotal > coreThreadSize;
    
                //当前线程数,超过最大线程数,可以退出
                //当前线程数,超过核心线程数,并且队列为空,可以退出。
                int curTotal = total.get();
                if (curTotal > maxThreadSize
                        || (needExitChance && timeout)) {
                	//尝试扣减线程数
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                try {
                	//needExit线程需要退出,那么就等待超时获取任务,超时未获取任务,给线程一个退出的机会。如果不需要退出,直接阻塞在take方法中。
                    Runnable task = needExitChance ? 
                    	TASKS.poll(THREAD_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS):
                    	TASKS.take();
                    if (task != null)              
                    	return task;
                    //未获取任务,for循环判断退出条件是否满足
                    timeout=true;   
                } catch (InterruptedException e) {
                    //被打断,不返回null,for循环判断退出条件是否满足
                    timeout=false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    核心线程退出

    上述线程池的退出流程:
    超过最大线程数->最大线程数->空闲了->核心线程数,最终只能存活核心线程,而核心线程是不能退出的,如果需要核心线程退出,那么需要修改getTask。

    private Runnable getTask() {
           
            boolean timeout=false;
             //防止未获取任务,线程全部都退出,通过for循环,未获取任务后,判断是否符合退出的约定条件
            for (; ; ) {
                int curState = state.get();
                //线程池已经暂停
                if (curState >= STOP) {
                    //线程池已停止,任务退出,尝试扣除线程总数,利用for循环,实现cas操作
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                //当前线程数,超过最大线程数,可以退出
                //当前线程数,超过核心线程数,并且队列为空,可以退出。
                //当前允许核心线程退出,并且队列为空,可以退出。因为核心线程可以退出,因此需要严格校验队列是否为空,防止timeout失效。
                //核心线程也可以退出,因此needExit添加多个选择。
                boolean needExitChance  = coreThreadCanExit || curTotal > coreThreadSize;
                int curTotal = total.get();
                if (curTotal > maxThreadSize
                        || (needExitChance  && timeout) 
                        || (coreThreadCanExit && TASKS.isEmpty())) {
                	//尝试扣减线程数
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                try {
                	//needExit线程需要退出,那么就等待超时获取任务,超时未获取任务,给线程一个退出的机会。如果不需要退出,直接阻塞在take方法中。
                    Runnable task = needExitChance ? 
                    	TASKS.poll(THREAD_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS):
                    	TASKS.take();
                    if (task != null)              
                    	return task;
                    //未获取任务,for循环判断退出条件是否满足
                    timeout=true;   
                } catch (InterruptedException e) {
                    //被打断,不返回null,for循环判断退出条件是否满足
                    timeout=false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    添加线程

    无需分辨线程是否是核心线程,因为无论核心线程,还是普通线程,都没有进行特殊处理。

    /**
         * 添加线程 区分核心线程还是普通线程
         *
         * @param runnable 优先执行的任务
         * @param isCore   是否是核心线程
         */
        private boolean addThread(Runnable runnable, boolean isCore) {
        	mainLock.lock();
        	try{
    			//获取当前线程数量
    	        if (isCore && (THREADS.size() > coreThreadSize)) {
    	            //超过核心线程数
    	            return false;
    	        } else if (THREADS.size() > maxThreadSize) {
    	            return false;
    	        }
    	        Thread t = new Thread(new Runnable() {
    	            @Override
    	            public void run() {
    	           		Runnable task;
    	                while ((task =getTask())!=null) {
    	                    task.run();                    
    	                }
    	                //task==null时,线程就会退出。
    	            }
    	        });
    	        t.start();
    	        THREADS.add(t);
    	        return true;
    		}finally{
    			mainLock.unlock();
    		}        
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    任务入队列

    /**
         * 任务进入队列
         */
        private boolean offerQueue(Runnable task) {
            return TASKS.offer(task);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    拒绝策略

    /**
         * 拒绝
         */
        private void reject(Runnable task) {
            System.out.println("任务被拒绝");
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    添加任务

    public void addTask(Runnable runnable) {
            if (!addThread(runnable, true)) {
                //添加核心线程未成功
                if (!offerQueue(runnable)) {
                    //入队列未成功
                    if (!addThread(runnable, false)) {
                        //添加普通线程未成功
                        reject(runnable);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    经过上述步骤,一个简单的线程池已经初见雏形,但是还有一些问题,例如:线程池状态、线程是否工作中等。

    线程池状态

    可以通过AtomicInteger表明线程的状态。
    添加了如下状态:

    /**线程池状态:运行中,可以接收新的任务*/
        private final Integer running = 1;
        /**线程池状态:关闭,不再接收新的任务,已经入队列的任务,会继续执行*/
        private final Integer shutDown = 2;
        /**线程池状态:暂停,不再接收新的任务,已经入队列的任务,直接返回。*/
        private final Integer stop = 3;
        /**线程池状态:终止,不再接收新的任务,所有的任务都已经完成,所有的线程都已经销毁。*/
        private final Integer terminated=4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    添加状态对线程池的可能影响场景:

    1. 添加任务时,判断状态:必须要判断。
    2. 添加线程时,判断状态:可以不进行判断,因为添加线程发生在添加任务后,添加任务已经校验,添加线程可以忽略。
    3. 获取任务时,判断状态。因为线程池如果stop,那么所有等待的任务都要取消执行。

    相关代码修改

    1. 添加任务:
    public void addTask(Runnable runnable) {
            if (!isRunning()) {
                throw new IllegalStateException("线程池已经关闭");
            }
            //省略
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 获取任务
    private Runnable getTask() {
            if (!notStopped()) {
                //线程池已经stop,暂停后续任务的执行
                return null;
            }
            //忽略
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    shutdown线程池

    关闭线程池:修改线程池状态,拒绝新的任务,唤醒所有的阻塞线程。

    /**
         * 关闭线程池
         */
        public void shutDown() {
            //添加锁
            mainLock.lock();
            try {
            	//循环cas操作,忽略
                changeState(SHUTDOWN);
                //唤醒所有因为getTask导致阻塞的线程
                for (Thread thread : THREADS) {
                    thread.interrupt();
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    stop线程池

    /**
         * 关闭线程池:修改线程池状态,并返回所有待执行的任务
         *
         * @return
         */
        public List<Runnable> stop() {
            mainLock.lock();
            try {
            	//循环cas操作,忽略
                changeState(SHUTDOWN);
                //唤醒所有因为getTask导致阻塞的线程
                for (Thread thread : THREADS) {
                    thread.interrupt();
                }
                return Lists.newArrayList(TASKS);
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    区分线程运行中和等待中

    通过shutdown和stop方法,核心还是唤醒所有的等待线程,通过interrupt打断线程的阻塞,但是有个问题,有可能任务(Runnable的run方法)存在阻塞情况,如果线程在run方法中,因为网络请求阻塞,那么shutdown和stop中的interrupt会打断上述业务阻塞,因需要唤醒等待任务的线程,但是缺误伤业务自身的阻塞。因此shutdown和stop当前线程时,需要判断线程状态:运行中(执行任务的run方法)、等待任务中(getTask阻塞)。
    因此需要对Thread进行包装,需要带有状态识别,通过状态识别,区分任务状态。有两种方案:1. 状态标识符:通过AtomictBoolean判断。2. 锁:通过tryLock进行判断。
    采用状态标识符进行处理,因此需要对Thread进行包装,包装后进行如下修改。

    public class MyThreadPool {
        //线程池的队列
        private BlockingQueue<Runnable> TASKS = new ArrayBlockingQueue<>(100);
        //线程等待超时退出的时间
        private Long WAIT_TIMES_SECONDS = 1000L;
        //worker合集
        private Set<Worker> WORKERS = new HashSet<>(10);
        /**
         * 核心线程数
         */
        private Integer coreThreadSize;
    
        /**
         * 最大线程数
         */
        private Integer maxThreadSize;
    
        /**
         * 核心线程允许退出
         */
        private boolean coreThreadCanExit;
    
    
        /**
         * 当前线程数
         */
        private AtomicInteger total;
    
        /**
         * 线程等待时间
         */
        private Integer threadWaitingTime;
        /**
         * 线程池状态
         */
        private AtomicInteger state;
        /**
         * 线程池状态:运行中,可以接收新的任务
         */
        private final Integer RUNNING = 1;
        /**
         * 线程池状态:关闭,不再接收新的任务,已经入队列的任务,会继续执行
         */
        private final Integer SHUTDOWN = 2;
        /**
         * 线程池状态:暂停,不再接收新的任务,已经入队列的任务,直接返回。
         */
        private final Integer STOP = 3;
        /**
         * 线程池状态:终止,不再接收新的任务,所有的任务都已经完成,所有的线程都已经销毁。
         */
        private final Integer TERMINATED = 4;
    
        /**
         * 全局锁
         */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        private final Integer THREAD_WAITING_TIME_DEFAULT = 1000 * 60;
    
        public MyThreadPool(Integer coreThreadSize, Integer maxThreadSize, Integer threadWaitingTime) {
            this(coreThreadSize, maxThreadSize, threadWaitingTime, false, new LinkedBlockingDeque());
        }
    
        public MyThreadPool(Integer coreThreadSize, Integer maxThreadSize, Integer threadWaitingTime, Boolean coreThreadCanExit, BlockingQueue queue) {
            this.coreThreadSize = coreThreadSize;
            this.maxThreadSize = maxThreadSize;
            this.threadWaitingTime = threadWaitingTime;
            this.coreThreadCanExit = coreThreadCanExit;
            this.TASKS = queue;
        }
    
        /**
         * 修改最大线程数
         */
        public boolean setMaxThreadSize(Integer maxThreadSize) {
            Integer temp = this.maxThreadSize;
            this.maxThreadSize = maxThreadSize;
            if (temp > this.maxThreadSize) {
                //降低线程数,需要处理等待中的线程
                interrupt();
            }
            return true;
        }
    
        public boolean setCoreThreadSize(Integer coreThreadSize) {
            Integer temp = this.coreThreadSize;
            this.coreThreadSize = coreThreadSize;
            if (temp > this.coreThreadSize) {
                //降低线程数,需要处理等待中的线程
                interrupt();
            }
            return true;
        }
    
        public void addTask(Runnable runnable) {
            if (!isRunning()) {
                throw new IllegalStateException("线程池已经关闭");
            }
            if (!addWorker(runnable, true)) {
                //添加核心线程未成功
                if (!offerQueue(runnable)) {
                    //入队列未成功
                    if (!addWorker(runnable, false)) {
                        //添加普通线程未成功
                        reject(runnable);
                    }
                }
            }
        }
    
    
        /**
         * 关闭线程池:修改线程池状态,并返回所有待执行的任务
         *
         * @return
         */
        public List<Runnable> stop() {
            mainLock.lock();
            try {
                changeState(SHUTDOWN);
                //唤醒所有因为getTask导致阻塞的线程
                interrupt();
                return Lists.newArrayList(TASKS);
            } finally {
                mainLock.unlock();
            }
        }
    
    
        /**
         * 关闭线程池
         */
        public void shutDown() {
            //添加锁
            mainLock.lock();
            try {
                changeState(SHUTDOWN);
                //唤醒所有因为获取任务处于等待的线程
                interrupt();
            } finally {
                mainLock.unlock();
            }
        }
    
        private Runnable getTask() {
    
            boolean timeout=false;
            //防止未获取任务,线程全部都退出,通过for循环,未获取任务后,判断是否符合退出的约定条件
            for (; ; ) {
                int curState = state.get();
                //线程池已经暂停
                if (curState >= STOP) {
                    //线程池已停止,任务退出,尝试扣除线程总数,利用for循环,实现cas操作
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                int curTotal = total.get();
                //当前线程数,超过最大线程数,可以退出
                //当前线程数,超过核心线程数,并且队列为空,可以退出。
                //当前允许核心线程退出,并且队列为空,可以退出。因为核心线程可以退出,因此需要严格校验队列是否为空,防止timeout失效。
                //核心线程也可以退出,因此needExit添加多个选择。
                boolean needExitChance = coreThreadCanExit || curTotal > coreThreadSize;
                if (curTotal > maxThreadSize
                        || (needExitChance && timeout)
                        || (coreThreadCanExit && TASKS.isEmpty())) {
                    //尝试扣减线程数
                    if (tryDecrThread()) {
                        return null;
                    }
                    continue;
                }
                try {
                    //needExit线程需要退出,那么就等待超时获取任务,超时未获取任务,给线程一个退出的机会。如果不需要退出,直接阻塞在take方法中。
                    Runnable task = needExitChance ?
                            TASKS.poll(threadWaitingTime, TimeUnit.MILLISECONDS):
                            TASKS.take();
                    if (task != null)
                        return task;
                    //未获取任务,for循环判断退出条件是否满足
                    timeout=true;
                } catch (InterruptedException e) {
                    //被打断,不返回null,for循环判断退出条件是否满足
                    timeout=false;
                }
            }
        }
    
    
        private void changeState(Integer target) {
            //循环修改状态
            for (; ; ) {
                int currentState = state.get();
                if (currentState < target) {
                    state.compareAndSet(currentState, SHUTDOWN);
                    break;
                } else if (currentState == target) {
                    break;
                }
            }
        }
    
    
        /**
         * 唤醒所有等待任务的worker
         */
        private void interrupt() {
            for (Worker work : WORKERS) {
                if (work.tryWait()) {
                    work.interrupt();
                }
            }
        }
    
        /**
         * 添加线程 区分核心线程还是普通线程
         *
         * @param runnable 优先执行的任务
         * @param isCore   是否是核心线程
         */
        private boolean addWorker(Runnable runnable, boolean isCore) {
            mainLock.lock();
            try {
                //获取当前线程数量
                if (isCore && (WORKERS.size() > coreThreadSize)) {
                    //超过核心线程数
                    return false;
                } else if (WORKERS.size() > maxThreadSize) {
                    return false;
                }
                Worker worker = new Worker(runnable);
                int totalNum;
                do {
                    totalNum = total.get();
                } while (total.compareAndSet(totalNum, totalNum + 1));
                WORKERS.add(worker);
                try {
                    worker.start();
                    return true;
                } catch (Exception e) {
                    removeWork(worker);
                    return false;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
        private void removeWork(Worker worker) {
            int curTotal;
            do {
                curTotal = total.get();
            } while (total.compareAndSet(curTotal, curTotal - 1));
            WORKERS.remove(worker);
        }
    
    
        /**
         * 任务进入队列
         */
        private boolean offerQueue(Runnable task) {
            return TASKS.offer(task);
        }
    
        /**
         * 拒绝
         */
        private void reject(Runnable task) {
            System.out.println("任务被拒绝");
        }
    
    
        private boolean isRunning() {
            return RUNNING.equals(state.get());
        }
    
        /**
         *
         */
        private boolean lessThanShutdown() {
            return RUNNING.equals(state.get());
        }
    
        /**
         * 未暂停
         */
        private boolean notStopped() {
            return state.get() > SHUTDOWN;
        }
    
        public boolean tryDecrThread() {
            int curTotal = total.get();
            return total.compareAndSet(curTotal, curTotal - 1);
        }
    
        private class Worker implements Runnable {
            //对线程进行包装
            private Thread thread;
            //第一个任务,如果不为空,先执行该任务
            private Runnable firstTask;
            //线程运行状态
            private AtomicBoolean workerState;
            //线程运行中
            private final Boolean WORKER_RUNNING = true;
            private final Boolean WORKER_WAITING = false;
    
            public Worker(Runnable firstTask) {
                workerState.set(WORKER_WAITING);
                firstTask = firstTask;
                Thread t = new Thread(this);
            }
    
            public Worker() {
                new Worker(null);
            }
    
    
            @Override
            public void run() {
                Runnable task = firstTask;
                while (task != null || (task = getTask()) != null) {
                    //工作中
                    working();
                    try {
                        task.run();
                    } catch (RuntimeException runtimeException) {
                        throw runtimeException;
                    } catch (Exception exception) {
                        throw exception;
                    } catch (Throwable throwable) {
                        throw throwable;
                    } finally {
                        //完成工作
                        waiting();
                    }
                }
            }
    
            public boolean isWorking() {
                return Boolean.TRUE.equals(workerState.get());
            }
    
            public boolean working() {
                for (; ; ) {
                    if (WORKER_WAITING.equals(workerState.get())) {
                        workerState.compareAndSet(WORKER_WAITING, WORKER_RUNNING);
                    } else {
                        return true;
                    }
                }
            }
    
    
            public boolean waiting() {
                for (; ; ) {
                    if (WORKER_RUNNING.equals(workerState.get())) {
                        workerState.compareAndSet(WORKER_RUNNING, WORKER_WAITING);
                    } else {
                        return true;
                    }
                }
            }
    
            public boolean tryWait() {
                return workerState.compareAndSet(WORKER_RUNNING, WORKER_WAITING);
            }
    
            public void interrupt() {
                thread.interrupt();
            }
    
            public void start() {
                thread.start();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379

    上述代码主要修改:

    1. Thread包装为Worker。
    2. Worker添加的FirstTas为构造函数。
    3. Worker执行Runnable的run时,添加了状态的变动。
    4. 线程池关闭、暂停时,唤醒线程时,添加了worker状态的判断。

    修改coreThreadSize和maxThreadSize

    因为coreThreadSize和maxThreadSize都是参数,因此可以提供set方法进行修改,这样可以调整运行时的线程池。

    /**
         * 修改最大线程数
         */
        public boolean setMaxThreadSize(Integer maxThreadSize) {
            Integer temp = this.maxThreadSize;
            this.maxThreadSize = maxThreadSize;
            if (temp > this.maxThreadSize) {
                //降低线程数,需要处理等待中的线程
                interrupt();
            }
            return true;
        }
    
        public boolean setCoreThreadSize(Integer coreThreadSize) {
            Integer temp = this.coreThreadSize;
            this.coreThreadSize = coreThreadSize;
            if (temp > this.coreThreadSize) {
                //降低线程数,需要处理等待中的线程
                interrupt();
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    需要注意:当降低线程数时,有可能存在部分线程阻塞在getTask方法中,需要打断,这样才能唤醒线程,判断是否需要退出。

    总结自定义线程池

    1. 首先Thread包装成Worker,带有是否工作中的标识,这样在interrupt时,防止打断业务内容的阻塞情况。
    2. 通过线程内部的while循环,实现一个线程执行多个Runnable信息,当Runnable==null时,线程退出。
    3. getTask内容的for循环,防止线程全部退出,同时如果获取的任务为null,然后条件判断,只有符合退出条件,线程才允许退出。
    4. getTask设置线程退出的场景,主要有:1 线程数超过最大线程。2 线程数超过核心线程数,并且队列为空;3 配置核心线程允许退出,并且队列为空。
  • 相关阅读:
    【Try to Hack】vulhub靶场搭建
    docker数据卷命令演示
    如果文件已经存在与git本地库中,配置gitignore能否将其从git库中删除
    GCC编译器include包含规则
    Hadoop完全分布式搭建
    荣耀携手Blue Yonder,加快企业战略增长
    【Python爬虫必备技术栈】urllib库&&urllib3库
    【Linux】进程状态|僵尸进程 |孤儿进程
    如何用蓝牙实现无线定位(五)--双定位显示
    如何实现单病种上报的多院区/集团化/平台联动管理
  • 原文地址:https://blog.csdn.net/u010652576/article/details/126609970