• 编码踩坑——多线程可能带来意想不到的OOM


    多线程技术数据Java基础中比较重要的知识点;实际使用时,我们往往会用线程池来管理我们的异步线程,本篇介绍一个由于使用ExecutorCompletionService多线程可能导致的OOM问题,开发时要千万小心,即使用一套JDK API时一定要熟悉其原理和可能踩的坑,不要只看到其好处拿来就用,搞不好就要出线上事故;

    先看下线程池最基本的用法示例;

    1. ThreadPoolExecutor

    定义一个线程池;

    1. /**
    2. * 线程池
    3. */
    4. private static final ExecutorService executorService = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
    5. new ArrayBlockingQueue<>(100),
    6. r -> {
    7. Thread thread = new Thread(r);
    8. thread.setName("myThreadPoolExecutor");
    9. //设置异常捕获器
    10. thread.setUncaughtExceptionHandler((t, e) -> log.error("[message]async exec task error! e:{}", e.getMessage()));
    11. return thread;
    12. }, new ThreadPoolExecutor.AbortPolicy());

    接下来分别依次提交3个任务A/B/C,任务类型为有返回值的Callable task,每个任务的执行时间不同,分别为10s/3s/7s;异步任务提交的结果为Future类型,提交后接下来按序对每个Future对象调用future.get方法获取其结果;代码如下:

    1. /**
    2. * 测试ExecutorService获取异步结果的顺序及实际执行时间
    3. */
    4. private static void testThreadPool() {
    5. List> futureList = Lists.newArrayList();
    6. // 记录A/B/C的任务完成时间
    7. List taskFinshTimeList = Lists.newArrayList();
    8. AtomicLong finishA = new AtomicLong();
    9. AtomicLong finishB = new AtomicLong();
    10. AtomicLong finishC = new AtomicLong();
    11. // A cost 10s
    12. final Future futureA = executorService.submit(() -> {
    13. log.warn("exec A start");
    14. final long start = System.currentTimeMillis();
    15. try {
    16. TimeUnit.SECONDS.sleep(10);
    17. } catch (InterruptedException e) {
    18. log.error("A InterruptedException occur!");
    19. }
    20. finishA.set(System.currentTimeMillis());
    21. taskFinshTimeList.add(finishA);
    22. log.warn("exec A finish cost=[{}]ms", finishA.get() - start);
    23. return "A";
    24. });
    25. futureList.add(futureA);
    26. // B cost 3s
    27. final Future futureB = executorService.submit(() -> {
    28. log.warn("exec B start");
    29. final long start = System.currentTimeMillis();
    30. try {
    31. TimeUnit.SECONDS.sleep(3);
    32. } catch (InterruptedException e) {
    33. log.error("B InterruptedException occur!");
    34. }
    35. finishB.set(System.currentTimeMillis());
    36. taskFinshTimeList.add(finishB);
    37. log.warn("exec B finish cost=[{}]ms", finishB.get() - start);
    38. return "B";
    39. });
    40. futureList.add(futureB);
    41. // C cost 7s
    42. final Future futureC = executorService.submit(() -> {
    43. log.warn("exec C start");
    44. final long start = System.currentTimeMillis();
    45. try {
    46. TimeUnit.SECONDS.sleep(7);
    47. } catch (InterruptedException e) {
    48. log.error("C InterruptedException occur!");
    49. }
    50. finishC.set(System.currentTimeMillis());
    51. taskFinshTimeList.add(finishC);
    52. log.warn("exec C finish cost=[{}]ms", finishC.get() - start);
    53. return "C";
    54. });
    55. futureList.add(futureC);
    56. // 同步获取结果
    57. AtomicInteger taskIndex = new AtomicInteger();
    58. futureList.forEach(future -> {
    59. try {
    60. final String result = future.get();
    61. // 哪怕使用Future#get(long, java.util.concurrent.TimeUnit)方法,也不能使得当前异步任务执行完后立即就能拿出结果
    62. // final String result = future.get(5, TimeUnit.SECONDS);
    63. log.warn("sync get result, then do next task using [result={}], waiting [{}]ms after task finish.",
    64. result, System.currentTimeMillis() - taskFinshTimeList.get(taskIndex.getAndIncrement()).get());
    65. } catch (Exception e) {
    66. log.error("future#get error occur!");
    67. }
    68. });
    69. executorService.shutdown();
    70. }

    代码中,我打印了每个任务的执行开始时间、任务执行结束时间、从Future对象获取到异步结果的时间,以及从任务执行结束到获取异步结果这之间等待的时间

    根据我们的尝试,Future.get方法会阻塞主线程,因此我们预期的结果就是:尽管异步任务的执行结束的顺序依次为:B/C/A,但是由于是按照A/B/C的顺序从Future对象获取结果,因此实际获取到异步结果的顺序依次为A/B/C,下面是执行结果:

    1. 17:53:25.733 [myThreadPoolExecutor] exec B start
    2. 17:53:25.733 [myThreadPoolExecutor] exec A start
    3. 17:53:25.733 [myThreadPoolExecutor] exec C start
    4. 17:53:28.737 [myThreadPoolExecutor] exec B finish cost=[3001]ms
    5. 17:53:32.737 [myThreadPoolExecutor] exec C finish cost=[7001]ms
    6. 17:53:35.737 [myThreadPoolExecutor] exec A finish cost=[10001]ms
    7. 17:53:35.737 [main] WARN sync get result, then do next task using [result=A], waiting [7000]ms after task finish.
    8. 17:53:35.737 [main] WARN sync get result, then do next task using [result=B], waiting [3000]ms after task finish.
    9. 17:53:35.737 [main] WARN sync get result, then do next task using [result=C], waiting [0]ms after task finish.

    可以看到与我们的预期一致:先完成的2个任务B和C由于比A后调用Future#get(),尽管任务已经执行完了,但也要等到执行时间最长的任务A执行完并且Future#get()拿到结果后,才能通过Future#get()拿到B和C各自的结果,中间分别等了7s和3s;

    看到这里,尤其是做C端业务的同学可能会有点想法,一般来说,C端业务在处理一次请求时会调用下游N个接口(很多下游,如今日头条,一次用户刷新请求可能调用用户画像、视频推荐、帖子推荐、游戏推荐、广告推荐等多个业务方接口,对结果做聚合);

    如果都按照ExecutorService的线程池同步获取异步任务结果的这种方式,并且恰巧前几个异步任务调用的接口耗时比较久,那么获取异步结果的时候就比较悲催了,因为后面的执行更快的异步结果获取会阻塞等待

    这个时候,你可能百度了一下,能否有一种线程执行器,get的时候能根据异步任务完成的顺序get出来,让获取异步结果这一行为不阻塞呢?万能的Google告诉你,还只有这么一个东西,叫ExecutorCompletionService,号称"任务规划大师"(时间管理大师?[doge]),下面来试试;

    2. ExecutorCompletionService

    使用时直接将线程池作为其构造函数的入参即可,因为API与ExecutorService基本一致,因此代码基本不需要改动;

    1. /**
    2. * ExecutorCompletionService
    3. */
    4. private static final CompletionService completionService = new ExecutorCompletionService(executorService);

    测试代码如下:

    1. @SneakyThrows
    2. private static void testCompletionService() {
    3. List> futureList = Lists.newArrayList();
    4. // 记录A/B/C的任务完成时间
    5. List taskFinshTimeList = Lists.newArrayList();
    6. AtomicLong finishA = new AtomicLong();
    7. AtomicLong finishB = new AtomicLong();
    8. AtomicLong finishC = new AtomicLong();
    9. // A cost 10s
    10. final Future futureA = completionService.submit(() -> {
    11. log.warn("exec A start");
    12. final long start = System.currentTimeMillis();
    13. try {
    14. TimeUnit.SECONDS.sleep(10);
    15. } catch (InterruptedException e) {
    16. log.error("A InterruptedException occur!");
    17. }
    18. finishA.set(System.currentTimeMillis());
    19. taskFinshTimeList.add(finishA);
    20. log.warn("exec A finish cost=[{}]ms", finishA.get() - start);
    21. return "A";
    22. });
    23. futureList.add(futureA);
    24. // B cost 3s
    25. final Future futureB = completionService.submit(() -> {
    26. log.warn("exec B start");
    27. final long start = System.currentTimeMillis();
    28. try {
    29. TimeUnit.SECONDS.sleep(3);
    30. } catch (InterruptedException e) {
    31. log.error("B InterruptedException occur!");
    32. }
    33. finishB.set(System.currentTimeMillis());
    34. taskFinshTimeList.add(finishB);
    35. log.warn("exec B finish cost=[{}]ms", finishB.get() - start);
    36. return "B";
    37. });
    38. futureList.add(futureB);
    39. // C cost 7s
    40. final Future futureC = completionService.submit(() -> {
    41. log.warn("exec C start");
    42. final long start = System.currentTimeMillis();
    43. try {
    44. TimeUnit.SECONDS.sleep(7);
    45. } catch (InterruptedException e) {
    46. log.error("C InterruptedException occur!");
    47. }
    48. finishC.set(System.currentTimeMillis());
    49. taskFinshTimeList.add(finishC);
    50. log.warn("exec C finish cost=[{}]ms", finishC.get() - start);
    51. return "C";
    52. });
    53. futureList.add(futureC);
    54. // // 同步获取结果依旧会阻塞
    55. // AtomicInteger taskIndex = new AtomicInteger();
    56. // futureList.forEach(future -> {
    57. // try {
    58. // final String result = future.get();
    59. // log.warn("sync get result, then do next task using [result={}], waiting [{}]ms after task finish.",
    60. // result, System.currentTimeMillis() - taskFinshTimeList.get(taskIndex.getAndIncrement()).get());
    61. // } catch (Exception e) {
    62. // log.error("future#get error occur!");
    63. // }
    64. // });
    65. // 调用completionService.take方法获取异步结果
    66. for (int i = 0; i < futureList.size(); i++) {
    67. final String result = completionService.take().get();
    68. log.warn("completionService.take() [result={}]", result);
    69. }
    70. executorService.shutdown();
    71. }

    与上面的例子一样,3个任务,任务的执行时间这些条件都一样,区别在于通过CompletionService.take方法获取异步结果,主要关注下同步获取异步任务结果的测试结果;

    1. 20:37:23.857 [myThreadPoolExecutor] exec C start
    2. 20:37:23.857 [myThreadPoolExecutor] exec B start
    3. 20:37:23.857 [myThreadPoolExecutor] exec A start
    4. 20:37:26.860 [myThreadPoolExecutor] exec B finish cost=[3001]ms
    5. 20:37:26.861 [main] completionService.take() [result=B]
    6. 20:37:30.860 [myThreadPoolExecutor] exec C finish cost=[7001]ms
    7. 20:37:30.860 [main] completionService.take() [result=C]
    8. 20:37:33.860 [myThreadPoolExecutor] exec A finish cost=[10001]ms
    9. 20:37:33.860 [main] completionService.take() [result=A]

    从结果可知,每个异步任务执行完成后马上就能拿到异步结果,不会发生阻塞,这样的好处就是,前序异步任务执行完成后,马上就能拿到结果,紧接着只能执行后续的流程处理

    但是——

    如果我们直接拿CompletionService做ExecutorService的替换,并且恰恰我们不需要用到异步线程的执行结果(如Runnable类型的异步任务)时,就会出问题,可能引发系统OOM

    来看下CompletionService的源码,他怎么能做到通过CompletionService.take方法,就能按照异步任务的执行完成顺序获取异步结果呢?

    CompletionService的源码分析

    (1)CompletionService接口

    (2)ExecutorCompletionService是CompletionService接口唯一的的实现类;

    这里关注2个属性:线程池executor、阻塞队列completionQueue;

    (3)对比ExecutorService和ExecutorCompletionService的submit方法可以看出区别

    这个是ExecutorService的submit方法:

    这个是ExecutorCompletionService的submit方法:

    主要区别就在于上图标记的QueueingFuture,继续跟进去看:

    QueueingFuture继承自FutureTask,它重写了done()方法;重写后的逻辑为:当任务执行完成后,task就会被放到completionQueue队列里;也就是说,completionQueue队列里面的task都是已经done()完成了的task,这个task就是我们拿到的一个个的future结果;队列取出元素的顺序就是任务的完成顺序;

    如果调用队列completionQueue的task方法,会阻塞等待任务完成,直到某个任务完成后被插入队列;一旦从队列取出一个元素,这个元素一定是完成了的future,我们调用Future#get方法当然就能直接获得结果;

    总结下来,对比ExecutorService和ExecutorCompletionService二者,可知:我们在使用 ExecutorService#submit提交任务后需要关注每个任务返回的future何时才能完成;然而 CompletionService重写了done方法,对这些future进行了追踪,保证completionQueue队列里面一定是完成了的task,可以立即从future中get任务执行结果;

    上面说了"CompletionService执行无返回值的Runnable类型的异步任务时,可能引发系统OOM!" 为什么呢?

    原因很简单,因为我们使用了阻塞队列,这个队列会跟随完成的异步任务而加入,当这个队列没有将元素poll出来时,队列就会不断增长,占用内从就会越来越大,最终可能引发OOM;

    什么情况会将completionQueue队列的元素取出呢?以下三个方法:

    注意,直接调用Future#get并不能从completionQueue队列移除元素;正因为无返回值的Runnable任务我们很难会尝试去task一下,因此更容易导致队列一直未移除元素,最终内存不断增长;

    因此,不能直接拿ExecutorCompletionService作为ExecutorService的替换!!!

    3.  推荐使用CompletableFuture

    上面我们使用ExecutorCompletionService是因为我们并行的执行了多个异步任务,并且希望各个任务执行完后,能立即拿着结果去做下一件事;这里我推荐使用JDK8引入的组合式异步编程,下面是代码示例,它可以通过ofAll将多个异步任务一起同步尝试获取执行结果、可以使用JDK自带的forkJoinPool也可以自己制定线程池,并且可以通过提供的thenApply/thenRun/thenAccept等API灵活的实现流式异步编程,结合lambda表达式,代码清晰简洁明了;

    1. /**
    2. * 测试CompletableFuture获取异步结果的顺序及异步任务执行时间
    3. */
    4. @SneakyThrows
    5. private static void testCompletableFuture() {
    6. List> futureList = Lists.newArrayList();
    7. // 记录A/B/C的任务完成时间
    8. List taskFinshTimeList = Lists.newArrayList();
    9. AtomicLong finishA = new AtomicLong();
    10. AtomicLong finishB = new AtomicLong();
    11. AtomicLong finishC = new AtomicLong();
    12. // A cost 10s
    13. final CompletableFuture completableFutureA = CompletableFuture.supplyAsync(() -> {
    14. log.warn("exec A start");
    15. final long start = System.currentTimeMillis();
    16. try {
    17. TimeUnit.SECONDS.sleep(10);
    18. } catch (InterruptedException e) {
    19. log.error("A InterruptedException occur!");
    20. }
    21. finishA.set(System.currentTimeMillis());
    22. taskFinshTimeList.add(finishA);
    23. log.warn("exec A finish cost=[{}]ms", finishA.get() - start);
    24. return "A";
    25. }, executorService);
    26. futureList.add(completableFutureA);
    27. // B cost 3s
    28. final CompletableFuture completableFutureB = CompletableFuture.supplyAsync(() -> {
    29. log.warn("exec B start");
    30. final long start = System.currentTimeMillis();
    31. try {
    32. TimeUnit.SECONDS.sleep(3);
    33. } catch (InterruptedException e) {
    34. log.error("B InterruptedException occur!");
    35. }
    36. finishB.set(System.currentTimeMillis());
    37. taskFinshTimeList.add(finishB);
    38. log.warn("exec B finish cost=[{}]ms", finishB.get() - start);
    39. return "B";
    40. }, executorService);
    41. futureList.add(completableFutureB);
    42. // C cost 7s
    43. final CompletableFuture completableFutureC = CompletableFuture.supplyAsync(() -> {
    44. log.warn("exec C start");
    45. final long start = System.currentTimeMillis();
    46. try {
    47. TimeUnit.SECONDS.sleep(7);
    48. } catch (InterruptedException e) {
    49. log.error("C InterruptedException occur!");
    50. }
    51. finishC.set(System.currentTimeMillis());
    52. taskFinshTimeList.add(finishC);
    53. log.warn("exec C finish cost=[{}]ms", finishC.get() - start);
    54. return "C";
    55. }, executorService);
    56. futureList.add(completableFutureC);
    57. // 同步获取结果
    58. // AtomicInteger taskIndex = new AtomicInteger();
    59. // futureList.forEach(future -> {
    60. // try {
    61. // // 执行完取异步任务的结果不用阻塞 执行完立即去做下一件事 并且可以组合多个异步任务
    62. // future
    63. // // 第二件事 有返回值
    64. // .thenApply(result -> {
    65. // log.warn("sync get result, then do next task using [result={}], waiting [{}]ms after step.1 task finish.",
    66. // result, System.currentTimeMillis() - taskFinshTimeList.get(taskIndex.getAndIncrement()).get());
    67. // return "secondary_" + result;
    68. // })
    69. // // 第三件事 无返回值
    70. // .thenAccept(secondaryResult -> {
    71. // log.warn("sync get secondary result, then do next task using [secondary result={}]", secondaryResult);
    72. // });
    73. // } catch (Exception e) {
    74. // log.error("future#get error occur!");
    75. // }
    76. // });
    77. AtomicInteger index = new AtomicInteger();
    78. CompletableFuture[] array = new CompletableFuture[futureList.size()];
    79. futureList.forEach(completableFuture -> array[index.getAndIncrement()] = completableFuture);
    80. // 获取最早得到结果的那个异步任务的返回值 其他的异步任务还在执行不会中断
    81. final Object anyResult = CompletableFuture.anyOf(array).get();
    82. log.warn("anyResult={}", anyResult);

    更多的示例可以参考之前写的这篇文章《Java8 实战》笔记——4.CompletableFuture-组合式异步编程

    参考:

    多线程使用不当导致的 OOM

  • 相关阅读:
    【路由器】小米 WR30U 解锁并刷机
    抖音矩阵系统,抖音矩阵源码定制。
    这些ChatGPT旗下的AI工具你都认识吗?
    计算机网络——物理层-信道的极限容量(奈奎斯特公式、香农公式)
    浅谈绿色创新型校园的节约能耗与能耗管理的应用
    对象.属性与对象[属性]的区别
    基于ssm教学评价管理系统获取(java毕业设计)
    FPGA 芯片点亮标准?
    serveless 思想 Midway.js 框架使用教程(七)
    linux系统Jenkins工具配置webhook自动部署
  • 原文地址:https://blog.csdn.net/minghao0508/article/details/126323684