• 实验探究-ExecutorServiceAPI----未完待续!!!!


    1 isShutDown & isTerminated & executorRunnableError & executorRunnableTask

    isShutDown:当调用shutdown()或shutdownNow()方法后返回为true。
    isTerminated:当调用shutdown()方法后,并且所有提交的任务完成后返回为true;
    isTerminated:当调用shutdownNow()方法后,成功停止后返回为true;
    如果线程池任务正常完成,都为false

    public class ExecutorServiceTest {
        public static void main(String[] args) throws InterruptedException {
    //        isShutdown();
    //        isTerminated_1();
    //        executorRunnableError();
            executorRunnableTask();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1 isShutdown

    private static void isShutdown() {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            /**
             * void execute(Runnable command);
             */
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            /**
             * 如果此执行器已关闭,则返回true。
             */
            System.out.println(executorService.isShutdown());
            executorService.shutdown();
            System.out.println(executorService.isShutdown());
            /**
             * 执行完 executorService.shutdown(); 之后 还可以执行一个Runnable?不会,抛出异常!!!
             */
            executorService.execute(() -> System.out.println("i will executor after shutdown ..."));
            /**
             * false
             * true
             * Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.excutor.ExecutorServiceTest$$Lambda$2/932583850@cac736f rejected from java.util.concurrent.ThreadPoolExecutor@5e265ba4[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
             * 	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
             * 	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
             * 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
             * 	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
             * 	at com.thread.excutor.ExecutorServiceTest.isShutdown(ExecutorServiceTest.java:32)
             * 	at com.thread.excutor.ExecutorServiceTest.main(ExecutorServiceTest.java:9)
             */
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2 isTerminated

    private static void isTerminated_1() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            executorService.shutdown();
            System.out.println("executorService.isShutdown() " + executorService.isShutdown());
            /**
             * 1、当需要用到isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,
             *    必须在shutdown()方法关闭线程池之后才能使用,否则isTerminated()永不为TRUE,线程将一直阻塞在该判断的地方,导致程序最终崩溃。
             * 2、判断全部提交的任务是否完成,当调用shutdown()方法后,并且所有提交的任务完成后返回为true
             */
            System.out.println("1-executorService.isTerminated() " + executorService.isTerminated());
            System.out.println("executorService.isTerminating() " + executorService.isTerminating());
    
            /**
             * executorService.isShutdown() true
             * executorService.isTerminated() false
             * executorService.isTerminating() true
             */
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    3 executorRunnableError - IntStream.range(0, 10).boxed().forEach(i -> executorService.execute(() -> System.out.println(1 / 0)));

    private static void executorRunnableError() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(10, new MyThreadFactory());
            IntStream.range(0, 10).boxed().forEach(i -> executorService.execute(() -> System.out.println(1 / 0)));
            executorService.shutdown();
            try {
                executorService.awaitTermination(10, TimeUnit.MINUTES);
                System.out.println("============over============");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        private static class MyThreadFactory implements ThreadFactory {
            private final static AtomicInteger SEQ = new AtomicInteger();
            @Override
            public Thread newThread(@NotNull Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("My-Thread-" + SEQ.getAndIncrement());
                /**
                 *  void uncaughtException(Thread t, Throwable e);
                 */
                thread.setUncaughtExceptionHandler((t, e) -> {
                    System.out.println("The thread " + t.getName() + " executor failed.");
                    System.out.println("=====================");
                });
                return thread;
            }
        }
    /**
    The thread My-Thread-6 executor failed.
    The thread My-Thread-1 executor failed.
    =====================
    The thread My-Thread-14 executor failed.
    =====================
    The thread My-Thread-5 executor failed.
    =====================
    The thread My-Thread-4 executor failed.
    =====================
    The thread My-Thread-2 executor failed.
    =====================
    The thread My-Thread-0 executor failed.
    =====================
    The thread My-Thread-3 executor failed.
    =====================
    =====================
    The thread My-Thread-7 executor failed.
    =====================
    The thread My-Thread-12 executor failed.
    =====================
    ============over============
    
    Process finished with exit code 0
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    4 executorRunnableTask-自定义task任务

    /**
         *                             | ---->
         * send  ---> store db ---> 10 | ---->
         *                             | ---->
         */
        private static void executorRunnableTask() throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10, new MyThreadFactory());
            IntStream.range(0, 10).boxed().forEach(i -> executorService.execute(() ->
            {
                MyTask myTask = new MyTask(i) {
                    @Override
                    public void error(Throwable e) {
                        System.out.println("The no : " + i + " failed, update status to ERROR.");
                    }
    
                    @Override
                    public void done() {
                        System.out.println("The no : " + i + " successfully, update status to DONE.");
                    }
    
                    @Override
                    public void doExecutor() {
                        if (i % 3 == 0) {
                            int time = 1 / 0;
                        }
                    }
    
                    @Override
                    public void doInit() {
                        // do noting ...
                    }
                };
                myTask.run();
            }));
    
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.MINUTES);
            System.out.println("=======================");
            /**
             * The no : 0 failed, update status to ERROR.
             * The no : 3 failed, update status to ERROR.
             * The no : 2 successfully, update status to DONE.
             * The no : 1 successfully, update status to DONE.
             * The no : 4 successfully, update status to DONE.
             * The no : 5 successfully, update status to DONE.
             * The no : 7 successfully, update status to DONE.
             * The no : 6 failed, update status to ERROR.
             * The no : 8 successfully, update status to DONE.
             * The no : 9 failed, update status to ERROR.
             * =======================
             */
        }
        private abstract static class MyTask implements Runnable {
            public final Integer no;
            public MyTask(Integer no) {
                this.no = no;
            }
            @Override
            public void run() {
                try {
                    this.doInit();
                    this.doExecutor();
                    this.done();
                } catch (Throwable e) {
                    this.error(e);
                }
            }
            public abstract void error(Throwable e);
            public abstract void done();
            public abstract void doExecutor();
            public abstract void doInit();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    2 任务拒绝 testAbortPolicy & testDiscardPolicy & testDiscardOldestPolicy & testCallerRunsPolicy

    任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

    package com.thread.excutor;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    public class RejectedExecutionHandlerTest {
        public static void main(String[] args) {
    //        testAbortPolicy();
    //        testDiscardPolicy();
            testDiscardOldestPolicy();
    //        testCallerRunsPolicy();
        }
    
        /**
         * 由提交任务的线程处理该任务
         */
        private static void testCallerRunsPolicy() {
            ExecutorService executorService = new ThreadPoolExecutor(1,
                    2,
                    30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Thread::new,
                    new ThreadPoolExecutor.CallerRunsPolicy());
            IntStream.range(0, 3).boxed().forEach(item -> {
                executorService.execute(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println("yyyy " + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(() -> {
                System.out.println("xxxx == " +  Thread.currentThread().getName());
            });
            /**
             * xxxx == main
             * yyyy Thread-1
             * yyyy Thread-0
             * yyyy Thread-1
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
        }
    
        /**
         * 1、丢弃队列最前面的任务,然后重新提交被拒绝的任务。
         * 2、当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
         *    在rejectedExecution先从任务队列种弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列。
         */
        private static void testDiscardOldestPolicy() {
            ExecutorService executorService = new ThreadPoolExecutor(1,
                    2,
                    30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Thread::new,
                    new ThreadPoolExecutor.DiscardOldestPolicy());
            IntStream.range(0, 3).boxed().forEach(item -> {
                executorService.execute(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println("yyyy " + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(() -> System.out.println("xxxx" + Thread.currentThread().getName()));
            /**
             * yyyy Thread-0
             * yyyy Thread-1
             * xxxxThread-0
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
        }
    
        /**
         * 不处理新任务,直接丢弃掉
         */
        private static void testDiscardPolicy() {
            ExecutorService executorService = new ThreadPoolExecutor(1,
                    2,
                    30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Thread::new,
                    new ThreadPoolExecutor.DiscardPolicy());
            IntStream.range(0, 3).boxed().forEach(item -> {
                executorService.execute(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println("yyyy");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(() -> System.out.println("xxxx"));
            /**
             * yyyy
             * yyyy
             * yyyy
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
        }
    
        /**
         * Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.excutor.RejectedExecutionHandlerTest$$Lambda$5/604107971@7637f22 rejected from java.util.concurrent.ThreadPoolExecutor@4926097b[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
         * 	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
         * 	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
         * 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
         * 	at com.thread.excutor.RejectedExecutionHandlerTest.testAbortPolicy(RejectedExecutionHandlerTest.java:36)
         * 	at com.thread.excutor.RejectedExecutionHandlerTest.main(RejectedExecutionHandlerTest.java:11)
         * yyyy
         * yyyy
         * yyyy
         *
         * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
         */
        private static void testAbortPolicy() {
            ExecutorService executorService = new ThreadPoolExecutor(1,
                    2,
                    30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Thread::new,
                    /**
                     * AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行。
                     * 丢弃任务,并抛出异常:最大承载=maximumPoolSize + BlockingQueue
                     */
                    new ThreadPoolExecutor.AbortPolicy());
            IntStream.range(0, 3).boxed().forEach(item -> {
                executorService.execute(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println("yyyy");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(() -> System.out.println("xxxx"));
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173

    3 AllowCoreThreadTimeOut

    allowCoreThreadTimeOut 支持回收核心线程,合适定时任务这种回收。

    package com.thread.excutor;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    public class TestAllowCoreThreadTimeOut {
        public static void main(String[] args) {
    //        getActiveCount();
    //        testAllowCoreThreadTimeOut_1();
    //        testAllowCoreThreadTimeOut_2();
    //        testAllowCoreThreadTimeOut_3();
            testAllowCoreThreadTimeOut_4();
        }
    
        private static void getActiveCount() {
            /**
             *     public static ExecutorService newFixedThreadPool(int nThreads) {
             *         return new ThreadPoolExecutor(nThreads, nThreads,
             *                                       0L, TimeUnit.MILLISECONDS,
             *                                       new LinkedBlockingQueue());
             *     }
             */
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            System.out.println("1 - " + executorService.getActiveCount());
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("2 - " + executorService.getActiveCount());
            /**
             * 1 - 0
             * 2 - 1
             */
        }
    
        private static void testAllowCoreThreadTimeOut_1() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            System.out.println("1===" + executorService.getActiveCount());
            IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("2===" + executorService.getActiveCount());
            /**
             * 不会销毁
             * 1===0
             * 2===0
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
        }
    
        private static void testAllowCoreThreadTimeOut_2() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            executorService.allowCoreThreadTimeOut(true);
            System.out.println("1===" + executorService.getActiveCount());
            IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("2===" + executorService.getActiveCount());
            /**
             * Exception in thread "main" java.lang.IllegalArgumentException: Core threads must have nonzero keep alive times
             * 	at java.util.concurrent.ThreadPoolExecutor.allowCoreThreadTimeOut(ThreadPoolExecutor.java:1658)
             * 	at com.thread.excutor.TestU.testAllowCoreThreadTimeOut_2(TestU.java:70)
             * 	at com.thread.excutor.TestU.main(TestU.java:12)
             */
        }
    
        private static void testAllowCoreThreadTimeOut_3() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            executorService.setKeepAliveTime(10, TimeUnit.SECONDS);
            executorService.allowCoreThreadTimeOut(true);
            System.out.println("1===" + executorService.getActiveCount());
            IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("2===" + executorService.getActiveCount());
            /**
             * 程序会自动退出
             * 1===0
             * 2===0
             *
             * Process finished with exit code 0
             */
        }
    
        private static void testAllowCoreThreadTimeOut_4() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            executorService.setKeepAliveTime(10, TimeUnit.SECONDS);
            executorService.allowCoreThreadTimeOut(true);
            System.out.println("1===" + executorService.getActiveCount());
            IntStream.range(0, 5).boxed().forEach(item -> executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            System.out.println("2===" + executorService.getActiveCount());
            executorService.execute(() -> {
                try {
                    System.out.println("xxxxxx");
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("3===" + executorService.getActiveCount());
            /**
             * 1===0
             * 2===5
             * xxxxxx
             * 3===1
             *
             * Process finished with exit code 0
             */
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160

    4 InvokeAny & InvokeAll & submit

    public class TestInvokeAllAnySubmit {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    //        testInvokeAny();
    //        testInvokeAnyTimeOut();
    //        testInvokeAll_1();
    //        testInvokeAll_2();
    //        testInvokeAllTimeOut();
    //        testSubmitRunnable();
            testSubmitRunnable_2();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1 testInvokeAny

    	/**
         * 当一个结果返回的时候,其他的线程会接着执行吗?
         * 1、invokeAny取得第一个方法的返回值,当第一个任务结束后,会调用interrupt方法中断其它任务。
         * 2、invokeAll具有阻塞性。
         */
        private static void testInvokeAny() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            List<Callable<Integer>> callableList = IntStream.range(0, 5).boxed().map(item ->
                    (Callable<Integer>) () -> {
                        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1));
                        System.out.println(Thread.currentThread().getName() + " : " + item);
                        return item;
                    }).collect(Collectors.toList());
            /**
             *      T invokeAny(Collection> tasks,
             *                     long timeout, TimeUnit unit)
             *         throws InterruptedException, ExecutionException, TimeoutException;
             */
            Integer vale = executorService.invokeAny(callableList);
            System.out.println("===========finished============");
            System.out.println(vale);
            /**
             * pool-1-thread-2 : 1
             * pool-1-thread-1 : 0
             * pool-1-thread-5 : 4
             * pool-1-thread-4 : 3
             * pool-1-thread-3 : 2
             * ===========finished============
             * 1
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    private static void testInvokeAnyTimeOut() throws ExecutionException, InterruptedException, TimeoutException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            List<Callable<Integer>> callableList = IntStream.range(0, 5).boxed().map(item ->
                    (Callable<Integer>) () -> {
                        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
                        System.out.println(Thread.currentThread().getName() + " : " + item);
                        return item;
                    }).collect(Collectors.toList());
            /**
             *      T invokeAny(Collection> tasks,
             *                     long timeout, TimeUnit unit)
             *         throws InterruptedException, ExecutionException, TimeoutException;
             */
            Integer vale = executorService.invokeAny(callableList, 3, TimeUnit.SECONDS);
            System.out.println("===========finished============");
            System.out.println(vale);
    
            /**
             * 情况1、
             * pool-1-thread-4 : 3
             * ===========finished============
             * 3
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
    
            /**
             * 情况2、
             * 在指定时间内取得第一个先执行完任务的结果值,如果超时,抛出TimeoutException,并且interrupt线程。
             *
             * Exception in thread "main" java.util.concurrent.TimeoutException
             * 	at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:184)
             * 	at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:225)
             * 	at com.thread.excutor.TestInvokeAllAnySubmit.testInvokeAnyTimeOut(TestInvokeAllAnySubmit.java:28)
             * 	at com.thread.excutor.TestInvokeAllAnySubmit.main(TestInvokeAllAnySubmit.java:17)
             */
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    2 testInvokeAll

    private static void testInvokeAll_1() throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            List<Callable<Integer>> callableList = IntStream.range(0, 5).boxed().map(item ->
                    (Callable<Integer>) () -> {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println(Thread.currentThread().getName() + " : " + item);
                        return item;
                    }).collect(Collectors.toList());
            /**
             *      List> invokeAll(Collection> tasks)
             *         throws InterruptedException;
             */
            List<Future<Integer>> futures = executorService.invokeAll(callableList);
            futures.stream().map(item -> {
                try {
                    return item.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }).forEach(System.out::println);
            System.out.println("==========finished============");
            /**
             * pool-1-thread-5 : 4
             * pool-1-thread-2 : 1
             * pool-1-thread-1 : 0
             * pool-1-thread-4 : 3
             * pool-1-thread-3 : 2
             * 0
             * 1
             * 2
             * 3
             * 4
             * ==========finished============
             */
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    private static void testInvokeAll_2() throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            List<Callable<Integer>> callableList = IntStream.range(0, 5).boxed().map(item ->
                    (Callable<Integer>) () -> {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println(Thread.currentThread().getName() + " : " + item);
                        return item;
                    }).collect(Collectors.toList());
            List<Future<Integer>> futures = executorService.invokeAll(callableList);
            futures.parallelStream().map(item -> {
                try {
                    return item.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }).forEach(System.out::println);
            System.out.println("==========finished============");
            /**
             * pool-1-thread-2 : 1
             * pool-1-thread-4 : 3
             * pool-1-thread-1 : 0
             * pool-1-thread-5 : 4
             * pool-1-thread-3 : 2
             * 2
             * 0
             * 3
             * 4
             * 1
             * ==========finished============
             */
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    private static void testInvokeAllTimeOut() throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            List<Callable<Integer>> callableList = IntStream.range(0, 5).boxed().map(item ->
                    (Callable<Integer>) () -> {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println(Thread.currentThread().getName() + " : " + item);
                        return item;
                    }).collect(Collectors.toList());
            List<Future<Integer>> futures = executorService.invokeAll(callableList, 1, TimeUnit.SECONDS);
            futures.parallelStream().map(item -> {
                try {
                    return item.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }).forEach(System.out::println);
            System.out.println("==========finished============");
            /**
             * Exception in thread "main" java.util.concurrent.CancellationException
             * 	at java.util.concurrent.FutureTask.report(FutureTask.java:121)
             * 	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
             * 	at com.thread.excutor.TestInvokeAllAnySubmit.lambda$testInvokeAllTimeOut$2(TestInvokeAllAnySubmit.java:35)
             * 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
             * 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
             * 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
             * 	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
             * 	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
             * 	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
             * 	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
             * 	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
             * 	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
             * 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
             * 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
             * 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
             * 	at com.thread.excutor.TestInvokeAllAnySubmit.testInvokeAllTimeOut(TestInvokeAllAnySubmit.java:39)
             * 	at com.thread.excutor.TestInvokeAllAnySubmit.main(TestInvokeAllAnySubmit.java:21)
             */
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    3 testSubmitRunnable

    /**
         *  Future submit(Runnable task, T result);
         * Future submit(Runnable task);
         *  Future submit(Callable task);
         *
         * 在使用线程池的时候,发现除了execute()方法可以执行任务外,还发现有一个方法submit()可以执行任务。
         * submit()的参数既可以是Runnable,又可以是Callable。
         * 适用于需要获取返回值的场景。
         * 调用future get方法时,异常会抛出。如果不调用get方法,则无法获取。
         */
        private static void testSubmitRunnable_2() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            String result = "done";
            Future<String> future = executorService.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, result);
            String s = future.get();
            System.out.println(s);
            /**
             * done
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
        }
    
        private static void testSubmitRunnable() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            Future<?> future = executorService.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Object o = future.get();
            System.out.println("result = " + o);
            /**
             * result = null
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             * 程序未执行结束。。。
             */
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    5 小实验-getQueue().add()

    package com.thread.excutor;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class TestU {
        public static void main(String[] args) {
    //        getQueueAdd_1();
    //        getQueueAdd_2();
        }
        
    
        private static void getQueueAdd_2() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            executorService.execute(() -> System.out.println("I will be process because of triggered the executor."));
            executorService.getQueue().add(() -> System.out.println("I am added directly into the queue"));
            /**
             * I will be process because of triggered the executor.
             * I am added directly into the queue
             *
             * Process finished with exit code 130 (interrupted by signal 2: SIGINT)
             */
    
        }
    
        private static void getQueueAdd_1() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
            executorService.getQueue().add(() -> System.out.println("I am added directly into the queue"));
            /**
             * 未执行 。。。
             * Process finished with exit code 0
             */
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    目前工业界常用的推荐系统模型有哪些?
    MySQL关于查询条件中:字符串类型的值忽略英文字母的大小写以及字符串尾部包含空格的问题
    区块链技术与应用 【全国职业院校技能大赛国赛题目解析】第五套智能合约安全漏洞测试
    SpringCloud Feign 远程调用(史上最详细讲解)
    人血清白蛋白修饰生物素HSA-Biotin;仅供科研实验用
    SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息
    【面试必问】HTTP与HTTPS的区别以及HTTPS的工作流程
    重定向与转发的几种方式
    解析外贸开发信的结构?营销邮件书写技巧?
    C++中常见的头文件
  • 原文地址:https://blog.csdn.net/zs18753479279/article/details/128054914