• Java线程池的知识


    1.为什么要用线程池

    一些任务适合使用单独的线程去执行,而线程作为一项比较重的资源如果频繁创建对系统资源消耗较大。使用线程池,将线程重复使用,节省频繁创建线程的开销。
    创建一个线程需要调用操作系统的API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。所以应该避免线程的频繁创建和销毁。

    2.线程池的定义

    在Java中表示线程池的类为ThreadPoolExecutor。定义时最多需要设置7个参数。
    1.核心线程数,当线程池最少维护的线程个数

    2.最大线程数,当提交任务时所有核心线程都在工作,切阻塞队列已经满了,则继续创建线程,这个参数定义了创建线程数的上限

    3和4 线程活跃时间,当线程超过该时间空闲时销毁线程,当然要保留核心线程数个线程

    5 阻塞队列,任务提交时,当无空闲线程且线程个数到达核心线程数,则先将任务保存到该队列中。为了避免OOM阻塞队列需要设置一个上限,不建议使用Executors定义好的几种线程池就是因为它的阻塞队列是无界的。

    6 线程工厂,定义创建线程的方式,比如指定线程名字

    7 拒绝策略,当阻塞队列满了,这时再提交任务则触发决绝策略。默认提供了4种:抛出异常、静默处理、调用者线程自身处理、删除最早提交的任务。也可以自定义处理方式,比如使用另一个补偿线程池执行、放入MQ让其他消费者执行、放入ySQL启动一个线程遍历执行等。

        public static void func() {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    2, //核心线程数
                    5, //最大线程数
                    10, //线程活跃时间,超过这个时间没有执行任务释放该线程
                    TimeUnit.SECONDS, //活跃时间单位
                    new ArrayBlockingQueue<>(1024), //阻塞队列,设置一个上限避免OOM
    
                    new ThreadFactory() {
                        private final AtomicInteger threadId = new AtomicInteger(0);
    
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "MyThreadPool-" + threadId.getAndIncrement());
                        }
                    }, //线程工厂,指定如何生成线程。这里给线程指定一个有意义的名字。(线程可以重名吗?)
                    new ThreadPoolExecutor.CallerRunsPolicy() //拒绝策略,当所有线程都在忙碌,而阻塞队列到达上限时执行的操作。
                    // 有4种已定义的:抛出异常、不执行静默处理、提交任务的线程自己执行、抛弃最早的提交的任务;
                    // 也可以自定义处理方式,如放入消息队列进行补偿执行、放入数据库进行补偿(有个线程进行定时遍历)、使用专门的补偿线程池进行执行
            );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.提交任务

    1.提交Runnable,不关心返回值、执行情况

    ThreadPoolExecutor.execute(Runnable r)
    
    • 1

    此方法没有返回值,适用于任务执行结束不需要有处理结果的情况。且不能进行任务的取消。

    2.提交Runnable,返回future,可以取消任务、等待任务结果

    Future<?> future = ThreadPoolExecutor.submit(Runnable r)
    
    • 1

    返回一个future,提供了isDone()判断任务是否执行完;cancel(true)取消任务,参数表示如果任务已经在执行了是否发出Innterrupted中断;future.get()阻塞获取执行结果,当然这里提交的Runnable总是返回null,但可以作为等待任务结束的方法。

    3.提交Callable,返回future,通过future拿到执行结果

    Future<T> ThreadPoolExecutor.submit(Callable<T> c)
    
    • 1

    这个future的get()方法就可以获取到Callable的call方法的返回值。当然isDone、cancel等方法还是可以正常使用的

    4.线程池的思想

    1.池化资源思想
    线程池是一种池,将线程创建后重复使用,是一种池化思想。池化思想在编程领域有广泛应用,比如对象池、连接池等。

    2.生产者、消费者模型
    和一般的池化资源不同,线程池并不是通过获取资源、使用资源、释放资源的步骤来使用的。而是一种生产者、消费者模型。提交任务是一种生产行为,线程池中的线程执行任务是消费行为。

    5.线程池中线程个数应该如何设置

    1.CPU密集型任务
    一般设置为CPU核心数+1,因为任务大部分时间都在利用CPU,每个线程利用一个CPU。设置+1是为了偶尔的内存缺页等导致线程中断时有线程可以顶上。

    2.IO密集型任务
    任务大部分时间都在执行IO操作,比如调用外部接口等。线程数就可以设置的大于CPU核心数。一个理想公式为:(1 + I/O时间 / CPU时间) * 核心数。比如IO和Cpu之比为9:1,那么设置10个线程。当9个线程使用完CPU后,剩下一个线程正好开始使用CPU。每时每刻都有1个线程使用CPU,9个线程使用IO,两者的利用率都达到了100%。

    6.线程池异常处理

    对于使用execute(Runnable r)提交的任务,抛出异常时提交任务的线程不会得到通知。对于另外两种有future返回的submit()方法提交的任务,如果调用了get方法,发生异常时get会抛出异常。如果不调用get也感知不到异常。

    推荐的方式:在任务中按需处理所有异常!

    7.线程池监控指标

    可以定时获取以下指标进行监控埋点:

    //核心线程数,这个应该不会动态变化吧?
    int corePoolSize = executor.getCorePoolSize(); 
    //最大线程数
    int maximumPoolSize = executor.getMaximumPoolSize(); 
    //历史最大线程数
    int largestPoolSize = executor.getLargestPoolSize(); 
    //活跃线程数,正在执行任务的线程数?
    int activeCount = executor.getActiveCount(); 
    //当前线程池中线程总数
    int poolSize = executor.getPoolSize();  
    //队列中任务个数
    int size = executor.getQueue().size();  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    8.Executors

    Java通过Executors类提供了4种默认的线程池,但是在实际情况中要谨慎使用,因为他们都使用了无界的任务队列,存在OOM的风险。

    1.newFixedThreadPool
    固定线程数的线程池。核心线程数=最大线程数,无界任务队列

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    # 实际定义
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.newCachedThreadPool
    核心线程数=0
    最大线程数=最大整数值
    空闲时间=60秒
    无界任务队列

    即线程最多空闲时间是60秒,有任务提交时如果还有线程则直接执行,否则直接创建新线程执行。

    ExecutorService executorService1 = Executors.newCachedThreadPool();
    
    # 构造方法定义
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    疑问:既然优先将任务放入阻塞队列。对于newCachedThreadPool线程池设置核心线程数为0,最大线程数为最大整数,阻塞队列貌似也无界,它是怎么运行的?
    解答:关键在于他使用的阻塞队列SynchronousQueue,它并不是无界的,而是容量为1的同步队列。生产者线程必须等待队列中的任务被消费后才能继续放入任务。所以放入第二个任务时就会开始创建线程执行第一个任务。那么如果只放入一个线程,那是不是就永远不会被执行?实测发现并不会出现这种情况,而是立即执行了。
    关于这个问题找到了下面这个解答:

    由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
    作者:go4it
    链接:https://www.jianshu.com/p/b7f7eb2bc778
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    3.newSingleThreadExecutor
    单线程线程池
    核心线程数=最大线程数=1

    ExecutorService executorService2 = Executors.newSingleThreadExecutor();
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4.newScheduledThreadPool
    可以执行定时、延迟任务的线程池
    可以执行两种任务,一种是定时周期性执行的;另一种是在固定延迟后执行一次。

    这四种线程池知道是怎么回事就行。使用时要仔细判断是否适用自己的场景,特别是注意OOM问题。第四种定时执行在需要执行一些简单定时任务时还是可以使用的。

    9.线程池执行流程

    线程池执行流程
    上图来自:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

    10.线程池终止

    提供了shutdown()和shutdownNow()两个方法来终止线程池。
    其中shutdown()方法调用后,线程池不再接收新提交的任务,会等待已提交的任务执行完成后终止线程池,包括正在执行的和阻塞队列中的任务。但shutdown()方法是立即返回的,不会等到线程池终止才返回。

    shutdownNow()会立即终止线程池,正在执行的线程会收到Interrupted中断,调用了线程的interrupt()方法。队列中未执行的任务会返回一个Runnables列表,对于其中的Future可以进行cancel等处理。

    11.线程池参数动态配置

  • 相关阅读:
    在微信小程序中怎么做投票活动
    基于flowable的upp(统一流程平台)运行性能优化(2)
    Git使用教程:入门到精通
    <Python>PyQt5自己编写一个音乐播放器(优化版)
    DBeaver Ultimate Edtion 22.1 Multilingual (macOS, Linux, Windows) - 通用数据库工具
    酷柚易汛ERP - 盘点操作指南
    Flink SQL -- 命令行的使用
    品质为先,服务不停,广州流辰信息公司恪守初心,匠心为民!
    基于SpringBoot+Mybatis+Layui实现的简单就业管理系统
    深度长文探讨JOIN运算的简化和提速
  • 原文地址:https://blog.csdn.net/vxzhg/article/details/126146326