• 【面试:并发篇39:多线程:线程池】ThreadPoolExecutor类-提交、停止


    【面试:并发篇39:多线程:线程池】ThreadPoolExecutor类-提交、停止

    00.前言

    如果有任何问题请指出,感谢。

    01.提交任务

    // 执行任务
    	void execute(Runnable command);
    	
    	// 提交任务 task,用返回值 Future 获得任务执行结果
    	<T> Future<T> submit(Callable<T> task);
    	
    	// 提交 tasks 中所有任务
    	<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    	 throws InterruptedException;
    	 
    	// 提交 tasks 中所有任务,带超时时间
    	<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    	 long timeout, TimeUnit unit)
    	 throws InterruptedException;
    	 
    	// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    	<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    	 throws InterruptedException, ExecutionException;
    	 
    	// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    	<T> T invokeAny(Collection<? extends Callable<T>> tasks,
    	 long timeout, TimeUnit unit)
    	 throws InterruptedException, ExecutionException, TimeoutException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    submit方法

    submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
    例子

    @Slf4j(topic = "c.TestPool1")
    public class TestPool1 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                    20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            Future<String> future = threadpool.submit(new Callable<String>() {
    
                @Override
                public String call() throws Exception {
                    log.debug("running");
                    Thread.sleep(1000);
                    return "ok";
                }
    
            });
            log.debug("{}",future.get());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    结果

    17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
    17:12:31.731 c.TestPool1 [main] - ok

    解释
    可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。

    invokeAll方法

    invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
    例子

    @Slf4j(topic = "c.TestPool2")
    public class TestPool2 {
        public static void main(String[] args) throws InterruptedException {
            ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                    20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
                    () -> {
                        log.debug("begin1");
                        Thread.sleep(1000);
                        return "1";
                    },
                    () -> {
                        log.debug("begin2");
                        Thread.sleep(500);
                        return "2";
                    },
                    () -> {
                        log.debug("begin3");
                        Thread.sleep(2000);
                        return "3";
                    }
            ));
    
            futures.forEach(f -> {
                try {
                    log.debug("{}",f.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    结果

    17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
    17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
    17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
    17:48:51.937 c.TestPool2 [main] - 1
    17:48:51.937 c.TestPool2 [main] - 2
    17:48:51.937 c.TestPool2 [main] - 3

    解释
    我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。

    invokeAny方法

    invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
    例子

    @Slf4j(topic = "c.TestPool3")
    public class TestPool3 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
                    20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
           String future = threadpool.invokeAny(Arrays.asList(
                    () -> {
                        log.debug("begin1");
                        Thread.sleep(1000);
                        log.debug("end1");
                        return "1";
                    },
                    () -> {
                        log.debug("begin2");
                        Thread.sleep(500);
                        log.debug("end2");
                        return "2";
                    },
                    () -> {
                        log.debug("begin3");
                        Thread.sleep(2000);
                        log.debug("end3");
                        return "3";
                    }
            ));
    
            log.debug("{}",future);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    结果

    18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
    18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
    18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
    18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
    18:03:19.412 c.TestPool3 [main] - 2

    解释
    注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。

    02.关闭线程池

    /*
    线程池状态变为 SHUTDOWN
    	- 不会接收新任务
    	- 但已提交任务会执行完
    	- 此方法不会阻塞调用线程的执行
    */
    void shutdown();
    // 源码
    public void shutdown() {
    	final ReentrantLock mainLock = this.mainLock;
    	mainLock.lock();
    	try {
    		checkShutdownAccess();
    		// 修改线程池状态
    		advanceRunState(SHUTDOWN);
    		// 仅会打断空闲线程
    		interruptIdleWorkers();
    		onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    	} finally {
    		mainLock.unlock();
    	}
    	// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    	tryTerminate();
    }
    
    /*
    线程池状态变为 STOP
    	- 不会接收新任务
    	- 会将队列中的任务返回
    	- 并用 interrupt 的方式中断正在执行的任务
    */
    List<Runnable> shutdownNow();
    // 源码
    public List<Runnable> shutdownNow() {
    	List<Runnable> tasks;
    	final ReentrantLock mainLock = this.mainLock;
    	mainLock.lock();
    	try {
    		checkShutdownAccess();
    		// 修改线程池状态
    		advanceRunState(STOP);
    		// 打断所有线程
    		interruptWorkers();
    		// 获取队列中剩余任务
    		tasks = drainQueue();
    	} finally {
    		mainLock.unlock();
    	}
    	// 尝试终结
    	tryTerminate();
    	return tasks;
    }
    
    
    // 其他方法
    // 不在 RUNNING 状态的线程池,此方法就返回 true
    boolean isShutdown();
    
    // 线程池状态是否是 TERMINATED
    boolean isTerminated();
    
    // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    shutdown与shutdownNow例子

    @Slf4j(topic = "c.TestShutDown")
    public class TestShutDown {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                    20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            Future<Integer> result1 = threadpool.submit(() -> {
                log.debug("task 1 running...");
                Thread.sleep(1000);
                log.debug("task 1 finish...");
                return 1;
            });
    
            Future<Integer> result2 = threadpool.submit(() -> {
                log.debug("task 2 running...");
                Thread.sleep(1000);
                log.debug("task 2 finish...");
                return 2;
            });
    
            Future<Integer> result3 = threadpool.submit(() -> {
                log.debug("task 3 running...");
                Thread.sleep(1000);
                log.debug("task 3 finish...");
                return 3;
            });
    
            // shutdown() 部分的代码
            log.debug("shutdown");
            threadpool.shutdown();
            threadpool.submit(()->{
                log.debug("task 4 running...");
                Thread.sleep(1000);
                log.debug("task 4 finish");
                return "4";
            });
            threadpool.awaitTermination(3, TimeUnit.SECONDS);
            log.debug("other...");
    
            // shutdownNow部分代码
            // log.debug("shutdownNow");
    //        List runnables = threadpool.shutdownNow();
    //        log.debug("other.... {}" , runnables);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    shutdown代码结果

    18:35:02.128 c.TestShutDown [main] - shutdown
    18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running…
    18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running…
    18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish…
    18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish…
    18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running…
    18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish…
    18:35:04.139 c.TestShutDown [main] - other…

    解释
    可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。

    shutdownNow代码结果

    18:47:52.344 c.TestShutDown [main] - shutdownNow
    18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running…
    18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running…
    18:47:52.354 c.TestShutDown [main] - other… [java.util.concurrent.FutureTask@e580929]

    解释
    可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。

  • 相关阅读:
    C语言学习系列:初识C语言
    【力扣刷题】重新格式化电话号码
    P1843 奶牛晒衣服 【贪心】
    Java(七)——集合框架---ArrayList集合、LinkedList集合
    极速系列03—python进行数据合并(concat/merge)
    【AI视野·今日Sound 声学论文速览 第三十六期】Mon, 30 Oct 2023
    配置elasticsearch用windows account(AD)登录
    死磕 36 个 JS 手写题(搞懂后,提升真的大)
    Mysql高级——索引优化和查询优化(2)
    五、业务数据分析
  • 原文地址:https://blog.csdn.net/m0_71229547/article/details/126089064