• 异步任务-线程池配置


    🧑‍🎓 个人主页Silence Lamb
    📖 本章内容:【基于线程池执行异步任务


    一、线程池介绍

    1.1【线程池核心参数】

    • 👉🏽使用线程池可以带来以下好处
    - 降低资源消耗。降低频繁创建、销毁线程带来的额外开销,复用已创建线程
    - 降低使用复杂度。将任务的提交和执行进行解耦
    - 我们只需要创建一个线程池,然后往里面提交任务就行
    - 具体执行流程由线程池自己管理,降低使用复杂度
    - 提高线程可管理性。能安全有效的管理线程资源,避免不加限制无限申请造成资源耗尽风险
    - 提高响应速度。任务到达后,直接复用已创建好的线程执行
    
    • 👉🏽ThreadPoolExecutor 都有哪些核心参数
    核心线程数(corePoolSize)
    最大线程数(maximumPoolSize)
    空闲线程超时时间(keepAliveTime)
    时间单位(unit)
    阻塞队列(workQueue)
    拒绝策略(handler)
    线程工厂(ThreadFactory)
    

    1.2【任务拒绝策略】

    • 👉🏽任务拒绝策略
    AbortPolicy:丢弃任务,抛运行时异常
    CallerRunsPolicy:由调用者线程去执行
    DiscardPolicy:直接丢弃任务本身
    DiscardOldestPolicy:从队列中踢出最先进入队列(最后一个执行)的任务
    
    • 👉🏽线程池的执行流程
    1. 判断线程池的状态,如果不是RUNNING状态,直接执行拒绝策略
    2. 如果 当前线程数 < 核心线程池,则新建一个线程来处理提交的任务
    3. 如果 当前线程数 > 核心线程数且任务队列没满,则将任务放入阻塞队列等待执行
    4. 如果 核心线程池 < 当前线程池数 < 最大线程数且任务队列已满,则创建新的线程执行提交的任务
    5. 如果 当前线程数 > 最大线程数且队列已满,则执行拒绝策略拒绝该任务

    二、配置线程池

    2.1【引入依赖】

    
    <dependency>
        <groupId>org.apache.commonsgroupId>
        <artifactId>commons-pool2artifactId>
    dependency>
    <dependency>
    <groupId>org.apache.commonsgroupId>
    	<artifactId>commons-lang3artifactId>
    	<version>3.12.0version>
    dependency>
    
    # 线程池配置
    thread:
      corePoolSize: 50 # 核心线程池大小
      maxPoolSize: 200 # 最大可创建的线程数
      queueCapacity: 1000  # 队列最大长度
      keepAliveSeconds: 300  # 线程池维护线程所允许的空闲时间
    
    /**
     * @author  Silence Lamb
     * @apiNote  读取线程池配置信息
     */
    @Data
    @Component
    @ConfigurationProperties(prefix = "thread")
    public class ThreadProperties {
    
        @ApiModelProperty("核心线程池大小 ")
        private int corePoolSize = 50;
    
        @ApiModelProperty("最大可创建的线程数")
        private int maxPoolSize = 200;
    
        @ApiModelProperty(" 队列最大长度")
        private int queueCapacity = 1000;
    
        @ApiModelProperty("线程池维护线程所允许的空闲时间")
        private int keepAliveSeconds = 300;
    
    }
    

    2.2【配置线程池】

    /**
     * @author  Silence Lamb
     * @apiNote  线程池配置信息
     */
    @Configuration
    public class ThreadPoolConfig {
    
        @Resource
        private ThreadProperties threadProperties;
    }
    

    线程池参数说明

    • 👉corePoolSize: 核心线程数
    核心线程会一直存活,及时没有任务需要执行
    当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
    设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
    
    • 👉maximumPoolSize:最大线程数
    当线程数=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
    当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
    
    • 👉keepAliveTime:线程空闲时间
    当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
    如果allowCoreThreadTimeout=true,则会直到线程数量=0
    
    • 👉queueCapacity:任务队列容量(阻塞队列)
    当核心线程数达到最大时,新任务会放在队列中排队等待执行
    
    • 👉rejectedExecutionHandler:任务拒绝处理器
    AbortPolicy:丢弃任务,抛运行时异常
    CallerRunsPolicy:由调用者线程去执行
    DiscardPolicy:直接丢弃任务本身
    DiscardOldestPolicy:从队列中踢出最先进入队列(最后一个执行)的任务
    

    1【创建线程池】

    • 现在大多数公司都在遵循阿里巴巴 Java 开发规范
    • 该规范里明确说明不允许使用 Executors 创建线程池
    • 而是通过 ThreadPoolExecutor 显示指定参数去创建
    	/**
    	 * 创建线程池
    	 *
    	 * @param
     	* @return
     	*/
    	@Bean("threadPoolExecutor")
        public ThreadPoolTaskExecutor threadPoolExecutor(){
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            threadPoolTaskExecutor.setMaxPoolSize(threadProperties.getMaxPoolSize());
            //最大可创建的线程数
            threadPoolTaskExecutor.setCorePoolSize(threadProperties.getCorePoolSize());
            //队列最大长度
            threadPoolTaskExecutor.setQueueCapacity(threadProperties.getQueueCapacity());
            //线程池维护线程所允许的空闲时间
            threadPoolTaskExecutor.setKeepAliveSeconds(threadProperties.getKeepAliveSeconds());
            //拒绝策略
            threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return threadPoolTaskExecutor;
       	}
    
    • ThreadPoolExecutor 来创建线程池,那核心参数设置多少合适呢
    Ncpu = CPU 核数
    
    Ucpu = 目标 CPU 利用率,0 <= Ucpu <= 1
    
    W / C = 等待时间 / 计算时间
    
    要程序跑到 CPU 的目标利用率,需要的线程数为:
    
    Nthreads = Ncpu * Ucpu * (1 + W / C)
    

    👉🏽 参考:线程池中各个参数如何合理设置

    2【执行周期性任务】

    	/**
    	 * 执行周期性或定时性任务
     	*
     	* @return
     	*/
        @Bean("scheduledExecutorService")
        public ScheduledExecutorService scheduledExecutorService() {
            //创建基本线程工厂-且设置命名格式-设置成守护线程
            BasicThreadFactory basicThreadFactory =
                new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build();
            ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
            return new ScheduledThreadPoolExecutor(threadProperties.getCorePoolSize(), basicThreadFactory, callerRunsPolicy);
        }
    

    2.3【获取bean】

    /**
     * @author Silence Lamb
     * @apiNote 在非spring管理环境中获取bean
     */
    @Component
    public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {
    
        private static ConfigurableListableBeanFactory configurableListableBeanFactory;
        private static ApplicationContext applicationContext;
    
        /**
         * 在标准初始化后修改应用程序上下文的内部 Bean 工厂
         * 所有 Bean 定义都将被加载,但尚未实例化任何 bean。这允许覆盖或添加属性
         * 甚至可以覆盖或添加属性到急切初始化的 bean。
         */
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            SpringUtils.configurableListableBeanFactory = beanFactory;
        }
    
        /**
         * 置运行此对象的应用程序上下文
         * 通常,此调用将用于初始化对象。
         * 在填充正常 bean 属性之后但在 init 回调之前调用
         */
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            SpringUtils.applicationContext = applicationContext;
        }
    
        /**
         * 获取对象
         *
         * @return Object 一个以所给名字注册的bean的实例
         */
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String name) throws BeansException {
            return (T) configurableListableBeanFactory.getBean(name);
        }
    
        /**
         * 获取配置文件中的值
         *
         * @param key 配置文件的key
         * @return 当前的配置文件的值
         */
        public static String getRequiredProperty(String key) {
            return applicationContext.getEnvironment().getRequiredProperty(key);
        }
    }
    

    2.4【异步任务管理器】

    /**
     * @author SilenceLamb
     * @apiNote 异步任务管理器
     */
    @Scope(name = "Singleton", description = "单例对象")
    @Service("asyncManager")
    public class AsyncManager {
    
        private static final Logger log = LoggerFactory.getLogger("ShutDown-ThreadPool");
        @ApiModelProperty("操作延迟10毫秒")
        private final int OPERATE_DELAY_TIME = 10;
        @Resource(name = "scheduledExecutorService")
        private ScheduledExecutorService scheduledExecutorService;
    
        /**
         * 执行异步任务
         */
        public void start(TimerTask timerTask) {
            scheduledExecutorService.schedule(timerTask, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
        }
    
        /**
         * 停止任务线程池
         * 确保应用退出时能关闭后台线程(对象销毁时执行)
         *
         * @throws Exception
         */
        @PreDestroy
        public void stop() {
            try {
                ThreadUtils.shutdown(scheduledExecutorService);
                log.info("关闭后台任务任务线程池");
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    }
    
    • 👉🏽execute() 提交任务和 submit() 提交任务有啥不同
    1. execute() 无返回值
    2. submit() 有返回值:会返回一个 FutureTask,然后可以调用 get() 方法阻塞获取返回值
    • 👉🏽自定义关闭线程方法
     * 停止线程池
     * 先使用shutdown, 停止接收新任务并尝试完成所有已存在任务.
     * 如果超时, 则调用shutdownNow, 取消在workQueue中Pending的任务,并中断所有阻塞函数.
     * 如果仍然超時,則強制退出.
     * 另对在shutdown时线程本身被调用中断做了处理.
    
         public static void shutdown(ExecutorService pool) {
            if (pool != null && !pool.isShutdown()) {
                pool.shutdown();
                try {
                    if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
                        pool.shutdownNow();
                        if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
                            log.info("Pool did not terminate");
                        }
                    }
                } catch (InterruptedException ie) {
                    pool.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        /**
         * 打印线程异常信息
         */
        public static void printException(Runnable r, Throwable t) {
            if (t == null && r instanceof Future<?>) {
                try {
                    Future<?> future = (Future<?>) r;
                    if (future.isDone()) {
                        future.get();
                    }
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            if (t != null) {
                log.error(t.getMessage(), t);
            }
        }
    
    	/**
         * sleep等待,单位为毫秒
         */
        public static void sleep(long milliseconds) {
            try {
                Thread.sleep(milliseconds);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    2.5【创建要执行的任务】

    /**
     * @author Silence Lamb
     * @apiNote 异步任务
     */
    public class AsyncTask {
    
        private static final Logger USER_LOGGER = LoggerFactory.getLogger("Login-AsyncTask");
        /**
         * 重写run方法执行任务
         */
        public TimerTask test() {
            return new TimerTask() {
                /**
                 * The action to be performed by this timer task.
                 */
                @Override
                public void run() {
                    USER_LOGGER.info("记录登录信息");
                }
            };
        }
    }
    
    • 测试异步任务:http://localhost/test/async
    /**
     * @author Silence Lamb
     * @apiNote 测试异步任务
     */
    @RestController
    @RequestMapping("/test")
    public class AsyncController {
    
        @RequestMapping("/async")
        public String testAsync() throws Exception {
            AsyncManager.asyncManager().start(new AsyncTask().test());
            return "登录成功";
        }
    }
    
    • 控制台就会打印信息
     Login-AsyncTask     : 记录登录信息
    

    🏅 项目地址:📢💨基于线程池异步执行任务
  • 相关阅读:
    8 迭代与列表生成式
    Python 文件写入操作
    RadSegNet: A Reliable Approach to Radar Camera Fusion 论文笔记
    43.【Java 实现验证码获取 C++实现密码加密和删除和QQ登入系统】
    ansible
    Win10配置IIS与 C#/.net项目的发布与IIS部署
    cesium 实战记录(六)地图通用工具方法的封装
    tomcat动静分离和负载均衡
    『C++ - STL』之优先级队列( priority_queue )
    衡师11月月赛web题目wp
  • 原文地址:https://blog.csdn.net/m0_46914264/article/details/127045786