• 掌握Java并发编程线程池的实现原理


    前沿

    Java中的线程池在实际项目中使用场景很多,几乎索引需要实现异步或者并发执行任务的程序都会使用到线程池,合理的使用线程池能够带来以下几点好处。

    • 降低资源的消耗: 通过出重复利用已创建的线程降低线程创建和销毁带来的性能消耗。
    • 提高响应速度: 当任务到达时,任务可以不需要等待线程的创建就能立即执行。
    • 提高线程的可管理性: 线程是稀缺资源,如果无限的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和控制。

    一、线程池的实现原理

    二、线程池的核心对象参数说明

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    线程池的核心对象说明
    corePoolSize线程池核心线程数量,corePoolSize 是线程池中的一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。
    maximumPoolSize线程池最大线程数量,线程池允许创建的最大线程数,如果队列满了,并且一创建的线程数小于最大线程数,则线程池会继续创建新的线程来执行任务。
    keepAliveTime线程保持活动的时间,如果任务很多,并且每个任务的执行时间比较短,可以调大时间,提高线程的利用率
    workQueue任务队列(阻塞队列),有多种任务队列,比如:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue等等
    ThreadFactory用于创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
    RejectedExecutionHandler饱和拒绝策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采用一种策略处理提交的新任务,默认策略是AbortPolicy,表示无法处理新任务时抛出异常,在JDK 1.5中线程池框架提供4中拒绝策略:
    1.AbortPolicy:直接抛异常。
    2.CallerRunsPolicy:只用调用者所在线程来运行任务。
    3.DiscardOldestPolicy:丢弃队列里最佳的一个任务,并执行当前任务。
    4.DiscardPocily:不处理,直接丢弃掉。

    三、Java中常用的四种线程池

    在Java中使用线程池,可以用ThreadPoolExecutor的构造函数直接创建出线程池实例,在Executors类中,为我们提供了常用线程池的创建方法。

    接下来我们就来了解常用的四种:

    newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue(),
                                      threadFactory);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    从构造方法可以看出,它创建了一个固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大值nThreads。线程池的大小一旦达到最大值后,再有新的任务提交时则放入无界阻塞队列中,等到有线程空闲时,再从队列中取出任务继续执行。 那么,如何使用newFixedThreadPool呢?我们来举个例子:

    public class ThreadPoolExecutorsChallenge {
    
        static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    
        public static void main(String[] args) {
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 5; i++) {
                final int index = i;
                fixedThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                            System.out.println("运行时间: " + sdf.format(new Date()) + " " + index);
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    运行结果:
    运行时间: 23:56:12 1
    运行时间: 23:56:12 0
    运行时间: 23:56:12 2
    运行时间: 23:56:14 3
    运行时间: 23:56:14 4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    上面我创建的一个固定大小为3的线程池,然后在线程池提交个5个任务,从结果可以看到,一开始三个任务进来都是立即执行,在提交第4个任务时,由于线程池大小已经到达3并且前3个任务在运行中,所以第4个任务被放进了队列,等待有空闲的线程时再被执行(前3个任务运行时间是一致的,后两个延迟了2秒才被执行)。

    这里仅仅是为了演示效果,正常的话手动创建线程池效果会更好,生产环境线程池不允许Executors创建,使用建议使用ThreadPoolExecutor方式创建,避免资源耗尽的风险。

    说明:

    Executors返回的线程池对象弊端如下有:

    1. FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能对堆积大量的请求,从而导致OOM。

    2. CacheThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能创建大量的线程,从而导致OOM。

    newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用newCachedThreadPool线程池:

    public class ThreadPoolExecutorsChallenge {
    
        public static void main(String[] args) {
            ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 5; i++) {
                final int index = i;
                fixedThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                            System.out.println("运行时间: " + sdf.format(new Date()) + " " + index);
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    运行结果:
    运行时间: 23:39:04 3
    运行时间: 23:39:04 1
    运行时间: 23:39:04 2
    运行时间: 23:39:04 0
    运行时间: 23:39:04 4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    因为这种线程有新的任务提交,就会创建新的线程(线程池中没有空闲线程时),不需要等待,所以提交的5个任务的运行时间是一样的,通过Executors.newCachedThreadPool()创建线程池可能会创建大量的线程,导致OOM。

    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    从构造方法可以看出,它创建了一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。 那么,如何使用newSingleThreadExecutor呢?我们来举个例子:

    public class ThreadPoolExecutorsChallenge {
    
        public static void main(String[] args) {
            ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 5; i++) {
                final int index = i;
                fixedThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                            System.out.println("运行时间: " + sdf.format(new Date()) + " " + index);
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    因为该线程池类似于单线程执行,所以先执行完前一个任务后,每隔2秒,再顺序执行下一个任务, 运行结果如下:

    运行时间: 23:47:05 0
    运行时间: 23:47:07 1
    运行时间: 23:47:09 2
    运行时间: 23:47:11 3
    运行时间: 23:47:13 4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    newScheduledThreadPool

    这个方法创建了一个固定大小的线程池,支持定时及周期性任务执行。 首先看一下定时执行的例子:

    public class ThreadPoolExecutorsChallenge {
    
        public static void main(String[] args) {
            final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
            System.out.println("任务提交时间:" + sdf.format(new Date()));
            scheduledThreadPool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("任务运行时间:" + sdf.format(new Date()));
                }
            }, 3, TimeUnit.SECONDS);
            scheduledThreadPool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    使用该线程池的schedule方法,延迟3秒钟后执行任务,运行结果如下:

    任务提交时间:23:54:22
    任务运行时间:23:54:25
    
    • 1
    • 2

    同时使用newScheduledThreadPool可以实现周期执行的例子,实现延迟1秒后每个三秒执行一次任务:

    public class ThreadPoolExecutorsChallenge {
    
        public static void main(String[] args) {
            final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
            System.out.println("提交时间: " + sdf.format(new Date()));
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("运行时间: " + sdf.format(new Date()));
                }
            }, 1, 3, TimeUnit.SECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    运行结果:
    提交时间: 00:03:15
    运行时间: 00:03:16
    运行时间: 00:03:19
    运行时间: 00:03:22
    运行时间: 00:03:25
    运行时间: 00:03:28
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    四、推荐

    避免耗尽的风险,推荐创建线程池方式:

    //获取系统处理器个数,作为线程池数量
    int nThreads = Runtime.getRuntime().availableProcessors();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d").build();
    System.out.println("系统处理器个数:" + nThreads);
    
    ExecutorService pool = new ThreadPoolExecutor(nThreads , 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    LeetCode:第302场周赛【总结】
    ECMAScript
    steam搬砖项目,小白也能月入过万的副业项目
    【k8s】1、基础概念和架构及组件
    基于Xlinx的时序分析与约束(2)----基础概念(上)
    大珩PPT助手一键颜色设置
    MyEclipse技术深度剖析——企业级的Java EE & Jakarta EE IDE
    3D高斯泼溅(Splatting)简明教程
    static关键字的三种用法
    移动端高级开发
  • 原文地址:https://blog.csdn.net/m0_70748381/article/details/126948633