• 多线程详细介绍


    一、分类
    创建线程的四种方法:

    (1)继承Thread
    (2)实现Runnable
    (3)实现Callable
    (4)线程池
    
    • 1
    • 2
    • 3
    • 4

    创建一个新的线程可以通过继承Thread类或者实现Runnable接口来实现,这两种方式创建的线程在运行结束后会被虚拟机销毁,进行垃圾回收,如果线程数量过多,频繁的创建和销毁线程会浪费资源,降低效率。而线程池的引入就很好解决了上述问题,线程池可以更好的创建、维护、管理线程的生命周期,做到复用,提高资源的使用效率,也避免了开发人员滥用new关键字创建线程的不规范行为。

    说明:阿里开发手册中明确指出,在实际生产中,线程资源必须通过线程池提供,不允许在应用中显式的创建线程。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
    
    • 1

    二、两个常用线程池的详细介绍
    ThreadPoolExecutor 是JDK线程池
    ThreadPoolTaskExecutor 是Spring线程池

    两者的区别:

    ThreadPoolExecutor是一个java类不提供spring生命周期和参数装配。
    ThreadPoolTaskExecutor实现了InitializingBean, DisposableBean ,xxaware等,具有spring特性
    AsyncListenableTaskExecutor提供了监听任务方法(相当于添加一个任务监听,提交任务完成都会回调该方法)
    简单理解:
    1、ThreadPoolTaskExecutor使用ThreadPoolExecutor并增强,扩展了更多特性
    2、ThreadPoolTaskExecutor只关注自己增强的部分,任务执行还是ThreadPoolExecutor处理。
    3、前者spring自己用着爽,后者离开spring我们用ThreadPoolExecutor爽。
    注意:ThreadPoolTaskExecutor 不会自动创建ThreadPoolExecutor需要手动调initialize才会创建
    如果@Bean 就不需手动,会自动InitializingBean的afterPropertiesSet来调initialize
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. ThreadPoolExecutor详细介绍
      在这里插入图片描述
      (1)线程池的执行顺序
      线程池按以下行为执行任务

    当线程数小于核心线程数时,创建线程。
    当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
    当线程数大于等于核心线程数,且任务队列已满,若线程数小于最大线程数,创建线程。
    若线程数等于最大线程数,则执行拒绝策略

    (2)线程池的参数详解
    corePoolSize
    核心线程数,默认为1。
    设置规则:
    CPU密集型(CPU密集型也叫计算密集型,指的是运算较多,cpu占用高,读/写I/O(硬盘/内存)较少):corePoolSize = CPU核数 + 1
    IO密集型(与cpu密集型相反,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。):corePoolSize = CPU核数 * 2

    maximumPoolSize
    最大线程数,默认为Integer.MAX_VALUE
    一般设置为和核心线程数一样

    keepAliveTime
    线程空闲时间,默认为60s,一般设置为默认60s

    unit
    时间单位,默认为秒

    workQueue
    队列,当线程数目超过核心线程数时用于保存任务的队列。(BlockingQueue workQueue)此队列仅保存实现Runnable接口的任务。(因为线程池的底层BlockingQueue的泛型为Runnable)
    无界队列
    队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列作为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而博主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。
    当然这种队列,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
    有界队列
    当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
    使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

    同步移交队列
    如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

    threadFactory
    线程工厂,用来创建线程。
    为了统一在创建线程时设置一些参数,如是否守护线程,线程一些特性等,如优先级。通过这个TreadFactory创建出来的线程能保证有相同的特性。
    它是一个接口类,而且方法只有一个,就是创建一个线程。
    如果没有另外说明,则在同一个ThreadGroup 中一律使用Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的NORM_PRIORITY 优先级和非守护进程状态。
    通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。
    如果从newThread 返回 null 时ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

    handler
    拒绝策略,默认是AbortPolicy,会抛出异常。
    当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务。
    当线程池被调用shutdown()后,会等待线程池里的任务执行完毕再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。
    AbortPolicy 丢弃任务,抛运行时异常。
    CallerRunsPolicy 由当前调用的任务线程执行任务。
    DiscardPolicy 忽视,什么都不会发生。
    DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务。

    1. ThreadPoolTaskExecutor详细介绍

    异步线程配置

    配置核心线程数

    async.executor.thread.core_pool_size = 5

    配置最大线程数

    async.executor.thread.max_pool_size = 5

    配置队列大小

    async.executor.thread.queue_capacity = 99999

    配置线程池中的线程的名称前缀

    async.executor.thread.name.prefix = async-service-

    示例
    线程池配置类

    @Configuration
    @EnableAsync
    public class ExecutorConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    
        @Value("${async.executor.thread.core_pool_size}")
        private int corePoolSize;
        @Value("${async.executor.thread.max_pool_size}")
        private int maxPoolSize;
        @Value("${async.executor.thread.queue_capacity}")
        private int queueCapacity;
        @Value("${async.executor.thread.name.prefix}")
        private String namePrefix;
    
        @Bean(name = "asyncServiceExecutor")
        public Executor asyncServiceExecutor() {
            logger.info("start asyncServiceExecutor");
            //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
            //配置核心线程数
            executor.setCorePoolSize(corePoolSize);
            //配置最大线程数
            executor.setMaxPoolSize(maxPoolSize);
            //配置队列大小
            executor.setQueueCapacity(queueCapacity);
            //配置线程池中的线程的名称前缀
            executor.setThreadNamePrefix(namePrefix);
    
            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
            // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //执行初始化
            executor.initialize();
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    
        private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
    
        private void showThreadPoolInfo(String prefix) {
            ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
    
            if (null == threadPoolExecutor) {
                return;
            }
    
            logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                    this.getThreadNamePrefix(),
                    prefix,
                    threadPoolExecutor.getTaskCount(),
                    threadPoolExecutor.getCompletedTaskCount(),
                    threadPoolExecutor.getActiveCount(),
                    threadPoolExecutor.getQueue().size());
        }
    
        @Override
        public void execute(Runnable task) {
            showThreadPoolInfo("1. do execute");
            super.execute(task);
        }
    
        @Override
        public void execute(Runnable task, long startTimeout) {
            showThreadPoolInfo("2. do execute");
            super.execute(task, startTimeout);
        }
    
        @Override
        public Future<?> submit(Runnable task) {
            showThreadPoolInfo("1. do submit");
            return super.submit(task);
        }
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            showThreadPoolInfo("2. do submit");
            return super.submit(task);
        }
    
        @Override
        public ListenableFuture<?> submitListenable(Runnable task) {
            showThreadPoolInfo("1. do submitListenable");
            return super.submitListenable(task);
        }
    
        @Override
        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
            showThreadPoolInfo("2. do submitListenable");
            return super.submitListenable(task);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    业务逻辑处理类

    @Service
    public class DemoService {
    
        private static final Logger logger = LoggerFactory.getLogger(DemoService.class);
    
        @Override
        @Async("asyncServiceExecutor")
        public void executeAsync() {
            logger.info("start executeAsync");
    
            System.out.println("异步线程要做的事情");
            System.out.println("可以在这里执行批量插入等耗时的事情");
    
            logger.info("end executeAsync");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    @GetMapping("/test")
        public void test() {
            demoService.executeAsync();
        }
    
    • 1
    • 2
    • 3
    • 4

    使用默认@Async进行异步操作时,注意需要配置默认线程池,示例

    spring.task.execution.pool.core-size=2
    spring.task.execution.pool.max-size=5
    spring.task.execution.pool.queue-capacity=10
    spring.task.execution.pool.keep-alive=60s
    spring.task.execution.pool.allow-core-thread-timeout=true
    spring.task.execution.shutdown.await-termination=false
    spring.task.execution.shutdown.await-termination-period=
    spring.task.execution.thread-name-prefix=task-
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    项目实战——项目上线
    在Python中如何对文件进行操作?
    设置ZIP文件打开密码的两种方法
    第17篇 2D绘图(七)涂鸦板
    BBS项目分布搭建二(个人站点相关)
    Kafka3.0.0版本——消费者(手动提交offset)
    小程序源码:恋爱小助手
    anime 动画引擎
    Unity编辑器扩展之CustomPropertyDrawer理解
    8月算法训练------第九天(搜索与回溯)解题报告
  • 原文地址:https://blog.csdn.net/u014365523/article/details/127881169