• Executors-四种创建线程的手段


    1 Executors.newCachedThreadPool()

    从构造方法可以看出,它创建了一个可缓存的线程池。当有新的任务提交时,有空闲线程则直接处理任务,没有空闲线程则创建新的线程处理任务,队列中不储存任务。线程池不对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。如果线程空闲时间超过了60秒就会被回收。在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统OOM

    package com.zs.thread;
    
    public class TestVolatile {
        public static void main(String[] args) {
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 5; i++) {
                final int index = i;
                cachedThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat(
                                    "HH:mm:ss");
                            System.out.println("运行时间: " +
                                    sdf.format(new Date()) + " " + index);
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
    
            cachedThreadPool.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    因为这种线程有新的任务提交,就会创建新的线程(线程池中没有空闲线程时),不需要等待,所以提交的5个任务的运行时间是一样的。
    在这里插入图片描述

    package com.thread.excutor;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    public class FourThreadPoolTest {
        public static void main(String[] args) {
            newCachedThreadPool();
        }
    
        /**
         *     public static ExecutorService newCachedThreadPool() {
         *         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
         *                                       60L, TimeUnit.SECONDS,
         *                                       new SynchronousQueue());
         *     }
         *
         *  这些池通常会提高执行许多短期异步任务的程序的性能。
         *
         *  SynchronousQueue():
         *     1、SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。
         *     2、SynchronousQueue 是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。
         *     生产者线程对其的插入操作put必须等待消费者的移除操作take。
         *     3、SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
         *       SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,
         *       那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。
         *       Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,
         *       如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
         *     4、SynchronousQueue 最大的特点在于,它的容量为0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;
         *       同理,每次放数据的时候也会阻塞,直到有消费者来取。
         *     5、SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。
         *       由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
         *       使用的数据结构是链表。使用CAS+自旋(无锁),自旋了一定次数后调用 LockSupport.park()进行阻塞。
         *
         *
         */
        private static void newCachedThreadPool() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
            System.out.println(executorService.getActiveCount());
            // sout -> 0
            IntStream.range(0, 5).boxed().forEach(item ->
                    executorService.execute(() -> {
                        try {
                            TimeUnit.SECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " [" + item + "]");
                        /**
                         * pool-1-thread-3 [2]
                         * pool-1-thread-4 [3]
                         * pool-1-thread-1 [0]
                         * pool-1-thread-5 [4]
                         * pool-1-thread-2 [1]
                         */
                    }));
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(executorService.getActiveCount()); // sout -> 5
        }
        /**
         * 60s之后线程池停止
         * 0
         * 5
         * pool-1-thread-3 [2]
         * pool-1-thread-4 [3]
         * pool-1-thread-1 [0]
         * pool-1-thread-5 [4]
         * pool-1-thread-2 [1]
         *
         * Process finished with exit code 0
         */
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    2 Executors.newFixedThreadPool(10)

    从构造方法可以看出,它创建了一个固定大小的线程池,每次提交一个任务就创建一个线程,直到线程数达到线程池的最大值nThreads。线程池的大小一旦达到最大值后,再有新的任务提交时则放入阻塞队列中,等到有线程空闲时,再从队列中取出任务继续执行。FixedThreadPool提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

    package com.zs.thread;
    public class TestVolatile {
        public static void main(String[] args) {
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    
            for (int i = 0; i < 5; i++) {
                final int index = i;
                fixedThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                            System.out.println("运行时间: " + sdf.format(new Date()) + " " + index);
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            fixedThreadPool.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    例中创建了一个固定大小为3的线程池,然后在线程池提交了5个任务。在提交第4个任务时,因为线程池的大小已经达到了3并且前3个任务在运行中,所以第4个任务被放入了队列,等待有空闲的线程时再被运行。运行结果如下(注意前3个任务和后2个任务的运行时间):
    在这里插入图片描述

    /**
         *     public static ExecutorService newFixedThreadPool(int nThreads) {
         *         return new ThreadPoolExecutor(nThreads, nThreads,
         *                                       0L, TimeUnit.MILLISECONDS,
         *                                       // Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE.
         *                                       new LinkedBlockingQueue());
         *     }
         */
        private static void newFixedThreadPool() {
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
            IntStream.range(0, 20).boxed().forEach(item ->
                    executorService.execute(() -> {
                        try {
                            TimeUnit.SECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " [" + item + "]");
                    }));
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(executorService.getActiveCount());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    10
    pool-1-thread-10 [9]
    pool-1-thread-1 [0]
    pool-1-thread-4 [3]
    pool-1-thread-5 [4]
    pool-1-thread-9 [8]
    pool-1-thread-6 [5]
    pool-1-thread-3 [2]
    pool-1-thread-8 [7]
    pool-1-thread-2 [1]
    pool-1-thread-7 [6]
    10s之后。。。
    pool-1-thread-9 [12]
    pool-1-thread-4 [11]
    pool-1-thread-6 [13]
    pool-1-thread-7 [17]
    pool-1-thread-5 [18]
    pool-1-thread-2 [16]
    pool-1-thread-8 [15]
    pool-1-thread-1 [10]
    pool-1-thread-3 [14]
    pool-1-thread-10 [19]
    
    程序不会停止。。。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3 Executors.newSingleThreadExecutor()

    从构造方法可以看出,它创建了一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

    package com.zs.thread;
    
    public class TestVolatile {
        public static void main(String[] args) {
            ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
            for (int i = 0; i < 5; i++) {
                final int index = i;
                singleThreadExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat(
                                    "HH:mm:ss");
                            System.out.println("运行时间: " +
                                    sdf.format(new Date()) + " " + index);
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
    
            singleThreadExecutor.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    因为该线程池类似于单线程执行,所以先执行完前一个任务后,再顺序执行下一个任务:
    在这里插入图片描述
    既然类似于单线程执行,那么这种线程池还有存在的必要吗?这里的单线程执行指的是线程池内部,从线程池外的角度看,主线程在提交任务到线程池时并没有阻塞,仍然是异步的。

    /**
         *     public static ExecutorService newSingleThreadExecutor() {
         *         return new FinalizableDelegatedExecutorService
         *             (new ThreadPoolExecutor(1, 1,
         *                                     0L, TimeUnit.MILLISECONDS,
         *                                     new LinkedBlockingQueue()));
         *     }
         *
         *
         */
        private static void newSingleThreadExecutor() {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            IntStream.range(0, 8).boxed().forEach(item ->
                    executorService.execute(() -> {
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " [" + item + "]" + System.currentTimeMillis());
                    }));
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    pool-1-thread-1 [0]1669445965474
    pool-1-thread-1 [1]1669445967480
    pool-1-thread-1 [2]1669445969481
    pool-1-thread-1 [3]1669445971487
    pool-1-thread-1 [4]1669445973492
    pool-1-thread-1 [5]1669445975497
    pool-1-thread-1 [6]1669445977500
    pool-1-thread-1 [7]1669445979505
    
    Process finished with exit code 130 (interrupted by signal 2: SIGINT)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    不会停止运行,始终有一个线程在运行。

    4 Executors.newScheduledThreadPool()

    这个方法创建了一个固定大小的线程池,支持定时及周期性任务执行。
    首先看一下定时执行的例子:

    package com.zs.thread;
    
    public class TestVolatile {
        public static void main(String[] args) {
            final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
            System.out.println("提交时间: " + sdf.format(new Date()));
            scheduledThreadPool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("运行时间: " + sdf.format(new Date()));
                }
            }, 3, TimeUnit.SECONDS);
            scheduledThreadPool.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    使用该线程池的schedule方法,延迟3秒钟后执行任务,运行结果如下:
    在这里插入图片描述

    package com.zs.thread;
    
    public class TestVolatile {
        public static void main(String[] args) throws InterruptedException {
            final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
            System.out.println("提交时间: " + sdf.format(new Date()));
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("运行时间: " + sdf.format(new Date()));
                }
            }, 1, 3, TimeUnit.SECONDS);
            Thread.sleep(10000);
            scheduledThreadPool.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    使用该线程池的scheduleAtFixedRate方法,延迟1秒钟后每隔3秒执行一次任务,运行结果如下:
    在这里插入图片描述

    5 Executors.newWorkStealingPool()

     /**
         *     public static ExecutorService newWorkStealingPool() {
         *         return new ForkJoinPool
         *             (Runtime.getRuntime().availableProcessors(),
         *              ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         *              null, true);
         *     }
         *
         *     可以传入线程的数量,不传入则默认使用当前计算机中可用的cpu数量,能够合理的使用CPU进行对任务操作(并行操作)。
         *     适合使用在很耗时的任务中,底层用的ForkJoinPool 来实现的:
         *     ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。
         */
        private static void newWorkStealingPool() {
            /*Optional.of(Runtime.getRuntime().availableProcessors())
                    .ifPresent(System.out::println);*/ // 8核CPU
            SimpleDateFormat sdf = new SimpleDateFormat(
                    "HH:mm:ss");
            ExecutorService executorService = Executors.newWorkStealingPool();
            List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(item ->
                    (Callable<String>) () -> {
                        System.out.println("Thread - " + Thread.currentThread().getName() + " - " + sdf.format(new Date()));
                        sleep(2L);
                        return "task-" + item;
                    }).collect(Collectors.toList());
            try {
              // invokeAll的作用是:等待所有的任务执行完成后统一返回。
              // 这里与大家分享的是:如果executorService是公共线程池慎用,如果这时候有另外一个请求也不断地往线程池里扔任务,这时候这个请求是不是就一直不停的阻塞了。
                executorService.invokeAll(callableList).stream().map(item -> {
                    try {
                        return item.get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).forEach(System.out::println);
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    线程会执行结束!

    Thread - ForkJoinPool-1-worker-3 - 15:29:15
    Thread - ForkJoinPool-1-worker-1 - 15:29:15
    Thread - ForkJoinPool-1-worker-6 - 15:29:15
    Thread - ForkJoinPool-1-worker-2 - 15:29:15
    Thread - ForkJoinPool-1-worker-4 - 15:29:15
    Thread - ForkJoinPool-1-worker-5 - 15:29:15
    Thread - ForkJoinPool-1-worker-0 - 15:29:15
    Thread - ForkJoinPool-1-worker-7 - 15:29:15
    Thread - ForkJoinPool-1-worker-3 - 15:29:17
    Thread - ForkJoinPool-1-worker-4 - 15:29:17
    Thread - ForkJoinPool-1-worker-0 - 15:29:17
    Thread - ForkJoinPool-1-worker-5 - 15:29:17
    Thread - ForkJoinPool-1-worker-6 - 15:29:17
    Thread - ForkJoinPool-1-worker-7 - 15:29:17
    Thread - ForkJoinPool-1-worker-2 - 15:29:17
    Thread - ForkJoinPool-1-worker-1 - 15:29:17
    Thread - ForkJoinPool-1-worker-0 - 15:29:19
    Thread - ForkJoinPool-1-worker-4 - 15:29:19
    Thread - ForkJoinPool-1-worker-5 - 15:29:19
    Thread - ForkJoinPool-1-worker-1 - 15:29:19
    task-0
    task-1
    task-2
    task-3
    task-4
    task-5
    task-6
    task-7
    task-8
    task-9
    task-10
    task-11
    task-12
    task-13
    task-14
    task-15
    task-16
    task-17
    task-18
    task-19
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    【Unity3D】UGUI回调函数
    【Linux系统编程:基础IO 下】dup2 实现输出重定向、输入重定向、追加重定向 | 理解磁盘 | 理解文件系统中inode的概念 | 软硬链接
    使用OpenTelemetry进行监控
    【SLAM】基于rrt_explore的移动机器人自主建图
    Cega在Solana上推出主网,为加密货币散户投资者提供更加安全的高收益机遇
    day 10.4
    HTML5 Canvas 数据持久化存储之属性列表
    跨境电商shopify站外引流渠道都有哪些?
    openstack——4、开启虚拟机
    IO流知识点学习
  • 原文地址:https://blog.csdn.net/zs18753479279/article/details/128050350