• 慧销平台ThreadPoolExecutor内存泄漏分析


    作者:京东零售 冯晓涛

    问题背景

    京东生旅平台慧销系统,作为平台系统对接了多条业务线,主要进行各个业务线广告,召回等活动相关内容与能力管理。

    最近根据告警发现内存持续升高,每隔2-3天会收到内存超过阈值告警,猜测可能存在内存泄漏的情况,然后进行排查。根据24小时时间段内存监控可以发现,容器的内存在持续上升:

    问题排查

    初步估计内存泄漏,查看24小时时间段jvm内存监控,排查jvm内存回收情况:

    YoungGC和FullGC情况:

    通过jvm内存分析和YoungGC与FullGC执行情况,可以判断可能原因如下:

    1、 存在YoungGC但是没有出现FullGC,可能是对象进入老年代但是没有到达FullGC阈值,所以没有触发FullGC,对象一直存在老年代无法回收

    2、 存在内存泄漏,虽然执行了YoungGC,但是这部分内存无法被回收

    通过线程数监控,观察当前线程情况,发现当前线程数7427个,并且还在不断上升,基本判断存在内存泄漏,并且和线程池的不当使用有关:

    通过JStack,获取线程堆栈文件并进行分析,排查为什么会有这么多线程:

    发现通过线程池创建的线程数达7000+:

    代码分析

    分析代码中ThreadPoolExecutor的使用场景,发现在一个worker公共类中定义了一个线程池,worker执行时会使用线程池进行异步执行。

     public class BackgroundWorker {
     
         private static ThreadPoolExecutor threadPoolExecutor;
     
         static {
             init(15);
         }
     
         public static void init() {
             init(15);
         }
     
         public static void init(int poolSize) {
             threadPoolExecutor =
                     new ThreadPoolExecutor(3, poolSize, 1000, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
         }
     
         public static void shutdown() {
             if (threadPoolExecutor != null && !threadPoolExecutor.isShutdown()) {
                 threadPoolExecutor.shutdownNow();
             }
         }
     
         public static void submit(final Runnable task) {
             if (task == null) {
                 return;
             }
             threadPoolExecutor.execute(() -> {
                 try {
                     task.run();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             });
         }
     
     }
    

    广告缓存刷新worker使用线程池的代码:

     public class AdActivitySyncJob {
    
        @Scheduled(cron = "0 0/5 * * * ?")
        public void execute() {
            log.info("AdActivitySyncJob start");
            List<DicDTO> locationList = locationService.selectLocation();
            if (CollectionUtils.isEmpty(locationList)) {
                return;
            }
    
            //中间省略部分无关代码
    
            BackgroundWorker.init(40);
            locationCodes.forEach(locationCode -> {
                showChannelMap.forEach((key,value)->{
                    BackgroundWorker.submit(new Runnable() {
                        @Override
                        public void run() {
                            log.info("AdActivitySyncJob,locationCode:{},showChannel:{}",locationCode,value);
                            Result<AdActivityDTO> result = notLoginAdActivityOuterService.getAdActivityByLocationInner(locationCode, ImmutableMap.of("showChannel", value));
                            LocalCache.AD_ACTIVITY_CACHE.put(locationCode.concat("_").concat(value), result);
                        }
                    });
                });
            });
            log.info("AdActivitySyncJob end");
        }
    
        @PostConstruct
        public void init() {
            execute();
        }
    }
    

    原因分析:猜测是worker每次执行,都会执行init方法,创建新的线程池,但是局部创建的线程池并没有被关闭,导致内存中的线程池越来越多,ThreadPoolExecutor在使用完成后,如果不手动关闭,无法被GC回收。

    分析验证

    验证局部线程池ThreadPoolExecutor创建后,如果不手动关闭,是否会被GC回收:

    public class Test {
        private static ThreadPoolExecutor threadPoolExecutor;
    
        public static void main(String[] args) {
            for (int i=1;i<100;i++){
                //每次均初始化线程池
                threadPoolExecutor =
                        new ThreadPoolExecutor(3, 15, 1000, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    
                //使用线程池执行任务
                for(int j=0;j<10;j++){
                    submit(new Runnable() {
                        @Override
                        public void run() {
                        }
                    });
                }
    
            }
            //获取当前所有线程
            ThreadGroup group = Thread.currentThread().getThreadGroup();
            ThreadGroup topGroup = group;
            // 遍历线程组树,获取根线程组
            while (group != null) {
                topGroup = group;
                group = group.getParent();
            }
            int slackSize = topGroup.activeCount() * 2;
            Thread[] slackThreads = new Thread[slackSize];
            // 获取根线程组下的所有线程,返回的actualSize便是最终的线程数
            int actualSize = topGroup.enumerate(slackThreads);
            Thread[] atualThreads = new Thread[actualSize];
            System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
            System.out.println("Threads size is " + atualThreads.length);
            for (Thread thread : atualThreads) {
                System.out.println("Thread name : " + thread.getName());
            }
        }
    
        public static void submit(final Runnable task) {
            if (task == null) {
                return;
            }
            threadPoolExecutor.execute(() -> {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
    

    输出:

    Threads size is 302

    Thread name : Reference Handler

    Thread name : Finalizer

    Thread name : Signal Dispatcher

    Thread name : main

    Thread name : Monitor Ctrl-Break

    Thread name : pool-1-thread-1

    Thread name : pool-1-thread-2

    Thread name : pool-1-thread-3

    Thread name : pool-2-thread-1

    Thread name : pool-2-thread-2

    Thread name : pool-2-thread-3

    Thread name : pool-3-thread-1

    Thread name : pool-3-thread-2

    Thread name : pool-3-thread-3

    Thread name : pool-4-thread-1

    Thread name : pool-4-thread-2

    Thread name : pool-4-thread-3

    Thread name : pool-5-thread-1

    Thread name : pool-5-thread-2

    Thread name : pool-5-thread-3

    Thread name : pool-6-thread-1

    Thread name : pool-6-thread-2

    Thread name : pool-6-thread-3

    …………

    执行结果分析,线程数量302个,局部线程池创建的核心线程没有被回收。

    修改初始化线程池部分:

    //初始化一次线程池
    threadPoolExecutor =
            new ThreadPoolExecutor(3, 15, 1000, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    
    for (int i=1;i<100;i++){
        //使用线程池执行任务
        for(int j=0;j<10;j++){
            submit(new Runnable() {
                @Override
                public void run() {
                }
            });
        }
    
    }
    

    输出:

    Threads size is 8

    Thread name : Reference Handler

    Thread name : Finalizer

    Thread name : Signal Dispatcher

    Thread name : main

    Thread name : Monitor Ctrl-Break

    Thread name : pool-1-thread-1

    Thread name : pool-1-thread-2

    Thread name : pool-1-thread-3

    解决方案

    1、只初始化一次,每次执行worker复用线程池

    2、每次执行完成后,关闭线程池

    BackgroundWorker的定位是后台执行worker均进行线程池的复用,所以采用方案1,每次在static静态代码块中初始化,使用时无需重新初始化。

    解决后监控:

    jvm内存监控,内存不再持续上升:

    线程池恢复正常且平稳:

    Jstack文件,观察线程池数量恢复正常:

    Dump文件分析线程池对象数量:

    拓展

    1、 如何关闭线程池

    线程池提供了两个关闭方法,shutdownNow 和 shutdown 方法。

    shutdownNow方法的解释是:线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。

    shutdown方法的解释是:线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。

    2、 为什么threadPoolExecutor不会被GC回收

    threadPoolExecutor =
             new ThreadPoolExecutor(3, 15, 1000, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    

    局部使用后未手动关闭的线程池对象,会被GC回收吗?获取线上jump文件进行分析:

    发现线程池对象没有被回收,为什么不会被回收?查看ThreadPoolExecutor.execute()方法:

    如果当前线程数小于核心线程数,就会进入addWorker方法创建线程:

    分析runWorker方法,如果存在任务则执行,否则调用getTask()获取任务:

    发现workQueue.take()会一直阻塞,等待队列中的任务,因为Thread线程一直没有结束, 存在引用关系:ThreadPoolExecutor->Worker->Thread,因为存在GC ROOT的引用,所以无法被回收 。

  • 相关阅读:
    隐语 Meetup 北京站|精彩时刻大盘点!新品发布、行业案例、专家解读......欢迎围观
    维也纳国际酒店+丽柏酒店首个同物业双品牌项目成功加盟,中高端酒店品牌联动浮现新模式
    构建一个WIFI室内定位系统
    cs224w(图机器学习)2021冬季课程学习笔记6
    SpringBoot 日志
    MISRA 2012学习笔记(5)-Rules 8.10
    长尾效应和肥尾效应
    Pod入门与实战
    Jetpack架构组件学习(3)——Activity Results API使用
    【实验十二】决策树判断你是否可学python
  • 原文地址:https://www.cnblogs.com/Jcloud/p/17162600.html