多线程技术数据Java基础中比较重要的知识点;实际使用时,我们往往会用线程池来管理我们的异步线程,本篇介绍一个由于使用ExecutorCompletionService多线程可能导致的OOM问题,开发时要千万小心,即使用一套JDK API时一定要熟悉其原理和可能踩的坑,不要只看到其好处拿来就用,搞不好就要出线上事故;
先看下线程池最基本的用法示例;
定义一个线程池;
- /**
- * 线程池
- */
- private static final ExecutorService executorService = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(100),
- r -> {
- Thread thread = new Thread(r);
- thread.setName("myThreadPoolExecutor");
- //设置异常捕获器
- thread.setUncaughtExceptionHandler((t, e) -> log.error("[message]async exec task error! e:{}", e.getMessage()));
- return thread;
- }, new ThreadPoolExecutor.AbortPolicy());
接下来分别依次提交3个任务A/B/C,任务类型为有返回值的Callable
- /**
- * 测试ExecutorService获取异步结果的顺序及实际执行时间
- */
- private static void testThreadPool() {
- List
> futureList = Lists.newArrayList(); - // 记录A/B/C的任务完成时间
- List
taskFinshTimeList = Lists.newArrayList(); - AtomicLong finishA = new AtomicLong();
- AtomicLong finishB = new AtomicLong();
- AtomicLong finishC = new AtomicLong();
- // A cost 10s
- final Future
futureA = executorService.submit(() -> { - log.warn("exec A start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(10);
- } catch (InterruptedException e) {
- log.error("A InterruptedException occur!");
- }
- finishA.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishA);
- log.warn("exec A finish cost=[{}]ms", finishA.get() - start);
- return "A";
- });
- futureList.add(futureA);
-
- // B cost 3s
- final Future
futureB = executorService.submit(() -> { - log.warn("exec B start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- log.error("B InterruptedException occur!");
- }
- finishB.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishB);
- log.warn("exec B finish cost=[{}]ms", finishB.get() - start);
- return "B";
- });
- futureList.add(futureB);
-
- // C cost 7s
- final Future
futureC = executorService.submit(() -> { - log.warn("exec C start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(7);
- } catch (InterruptedException e) {
- log.error("C InterruptedException occur!");
- }
- finishC.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishC);
- log.warn("exec C finish cost=[{}]ms", finishC.get() - start);
- return "C";
- });
- futureList.add(futureC);
-
- // 同步获取结果
- AtomicInteger taskIndex = new AtomicInteger();
- futureList.forEach(future -> {
- try {
- final String result = future.get();
- // 哪怕使用Future#get(long, java.util.concurrent.TimeUnit)方法,也不能使得当前异步任务执行完后立即就能拿出结果
- // final String result = future.get(5, TimeUnit.SECONDS);
- log.warn("sync get result, then do next task using [result={}], waiting [{}]ms after task finish.",
- result, System.currentTimeMillis() - taskFinshTimeList.get(taskIndex.getAndIncrement()).get());
- } catch (Exception e) {
- log.error("future#get error occur!");
- }
- });
-
- executorService.shutdown();
- }
代码中,我打印了每个任务的执行开始时间、任务执行结束时间、从Future对象获取到异步结果的时间,以及从任务执行结束到获取异步结果这之间等待的时间;
根据我们的尝试,Future.get方法会阻塞主线程,因此我们预期的结果就是:尽管异步任务的执行结束的顺序依次为:B/C/A,但是由于是按照A/B/C的顺序从Future对象获取结果,因此实际获取到异步结果的顺序依次为A/B/C,下面是执行结果:
- 17:53:25.733 [myThreadPoolExecutor] exec B start
- 17:53:25.733 [myThreadPoolExecutor] exec A start
- 17:53:25.733 [myThreadPoolExecutor] exec C start
- 17:53:28.737 [myThreadPoolExecutor] exec B finish cost=[3001]ms
- 17:53:32.737 [myThreadPoolExecutor] exec C finish cost=[7001]ms
- 17:53:35.737 [myThreadPoolExecutor] exec A finish cost=[10001]ms
- 17:53:35.737 [main] WARN sync get result, then do next task using [result=A], waiting [7000]ms after task finish.
- 17:53:35.737 [main] WARN sync get result, then do next task using [result=B], waiting [3000]ms after task finish.
- 17:53:35.737 [main] WARN sync get result, then do next task using [result=C], waiting [0]ms after task finish.
可以看到与我们的预期一致:先完成的2个任务B和C由于比A后调用Future#get(),尽管任务已经执行完了,但也要等到执行时间最长的任务A执行完并且Future#get()拿到结果后,才能通过Future#get()拿到B和C各自的结果,中间分别等了7s和3s;
看到这里,尤其是做C端业务的同学可能会有点想法,一般来说,C端业务在处理一次请求时会调用下游N个接口(很多下游,如今日头条,一次用户刷新请求可能调用用户画像、视频推荐、帖子推荐、游戏推荐、广告推荐等多个业务方接口,对结果做聚合);
如果都按照ExecutorService的线程池同步获取异步任务结果的这种方式,并且恰巧前几个异步任务调用的接口耗时比较久,那么获取异步结果的时候就比较悲催了,因为后面的执行更快的异步结果获取会阻塞等待;
这个时候,你可能百度了一下,能否有一种线程执行器,get的时候能根据异步任务完成的顺序get出来,让获取异步结果这一行为不阻塞呢?万能的Google告诉你,还只有这么一个东西,叫ExecutorCompletionService,号称"任务规划大师"(时间管理大师?[doge]),下面来试试;
使用时直接将线程池作为其构造函数的入参即可,因为API与ExecutorService基本一致,因此代码基本不需要改动;
- /**
- * ExecutorCompletionService
- */
- private static final CompletionService
completionService = new ExecutorCompletionService(executorService); -
测试代码如下:
- @SneakyThrows
- private static void testCompletionService() {
- List
> futureList = Lists.newArrayList(); - // 记录A/B/C的任务完成时间
- List
taskFinshTimeList = Lists.newArrayList(); - AtomicLong finishA = new AtomicLong();
- AtomicLong finishB = new AtomicLong();
- AtomicLong finishC = new AtomicLong();
- // A cost 10s
- final Future
futureA = completionService.submit(() -> { - log.warn("exec A start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(10);
- } catch (InterruptedException e) {
- log.error("A InterruptedException occur!");
- }
- finishA.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishA);
- log.warn("exec A finish cost=[{}]ms", finishA.get() - start);
- return "A";
- });
- futureList.add(futureA);
-
- // B cost 3s
- final Future
futureB = completionService.submit(() -> { - log.warn("exec B start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- log.error("B InterruptedException occur!");
- }
- finishB.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishB);
- log.warn("exec B finish cost=[{}]ms", finishB.get() - start);
- return "B";
- });
- futureList.add(futureB);
-
- // C cost 7s
- final Future
futureC = completionService.submit(() -> { - log.warn("exec C start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(7);
- } catch (InterruptedException e) {
- log.error("C InterruptedException occur!");
- }
- finishC.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishC);
- log.warn("exec C finish cost=[{}]ms", finishC.get() - start);
- return "C";
- });
- futureList.add(futureC);
-
- // // 同步获取结果依旧会阻塞
- // AtomicInteger taskIndex = new AtomicInteger();
- // futureList.forEach(future -> {
- // try {
- // final String result = future.get();
- // log.warn("sync get result, then do next task using [result={}], waiting [{}]ms after task finish.",
- // result, System.currentTimeMillis() - taskFinshTimeList.get(taskIndex.getAndIncrement()).get());
- // } catch (Exception e) {
- // log.error("future#get error occur!");
- // }
- // });
-
- // 调用completionService.take方法获取异步结果
- for (int i = 0; i < futureList.size(); i++) {
- final String result = completionService.take().get();
- log.warn("completionService.take() [result={}]", result);
- }
-
- executorService.shutdown();
- }
与上面的例子一样,3个任务,任务的执行时间这些条件都一样,区别在于通过CompletionService.take方法获取异步结果,主要关注下同步获取异步任务结果的测试结果;
- 20:37:23.857 [myThreadPoolExecutor] exec C start
- 20:37:23.857 [myThreadPoolExecutor] exec B start
- 20:37:23.857 [myThreadPoolExecutor] exec A start
- 20:37:26.860 [myThreadPoolExecutor] exec B finish cost=[3001]ms
- 20:37:26.861 [main] completionService.take() [result=B]
- 20:37:30.860 [myThreadPoolExecutor] exec C finish cost=[7001]ms
- 20:37:30.860 [main] completionService.take() [result=C]
- 20:37:33.860 [myThreadPoolExecutor] exec A finish cost=[10001]ms
- 20:37:33.860 [main] completionService.take() [result=A]
从结果可知,每个异步任务执行完成后马上就能拿到异步结果,不会发生阻塞,这样的好处就是,前序异步任务执行完成后,马上就能拿到结果,紧接着只能执行后续的流程处理;
但是——
如果我们直接拿CompletionService做ExecutorService的替换,并且恰恰我们不需要用到异步线程的执行结果(如Runnable类型的异步任务)时,就会出问题,可能引发系统OOM!
来看下CompletionService的源码,他怎么能做到通过CompletionService.take方法,就能按照异步任务的执行完成顺序获取异步结果呢?
CompletionService的源码分析
(1)CompletionService接口
(2)ExecutorCompletionService是CompletionService接口唯一的的实现类;
这里关注2个属性:线程池executor、阻塞队列completionQueue;
(3)对比ExecutorService和ExecutorCompletionService的submit方法可以看出区别
这个是ExecutorService的submit方法:
这个是ExecutorCompletionService的submit方法:
主要区别就在于上图标记的QueueingFuture,继续跟进去看:
QueueingFuture继承自FutureTask,它重写了done()方法;重写后的逻辑为:当任务执行完成后,task就会被放到completionQueue队列里;也就是说,completionQueue队列里面的task都是已经done()完成了的task,这个task就是我们拿到的一个个的future结果;队列取出元素的顺序就是任务的完成顺序;
如果调用队列completionQueue的task方法,会阻塞等待任务完成,直到某个任务完成后被插入队列;一旦从队列取出一个元素,这个元素一定是完成了的future,我们调用Future#get方法当然就能直接获得结果;
总结下来,对比ExecutorService和ExecutorCompletionService二者,可知:我们在使用 ExecutorService#submit提交任务后需要关注每个任务返回的future何时才能完成;然而 CompletionService重写了done方法,对这些future进行了追踪,保证completionQueue队列里面一定是完成了的task,可以立即从future中get任务执行结果;
上面说了"CompletionService执行无返回值的Runnable类型的异步任务时,可能引发系统OOM!" 为什么呢?
原因很简单,因为我们使用了阻塞队列,这个队列会跟随完成的异步任务而加入,当这个队列没有将元素poll出来时,队列就会不断增长,占用内从就会越来越大,最终可能引发OOM;
什么情况会将completionQueue队列的元素取出呢?以下三个方法:
注意,直接调用Future#get并不能从completionQueue队列移除元素;正因为无返回值的Runnable任务我们很难会尝试去task一下,因此更容易导致队列一直未移除元素,最终内存不断增长;
因此,不能直接拿ExecutorCompletionService作为ExecutorService的替换!!!
上面我们使用ExecutorCompletionService是因为我们并行的执行了多个异步任务,并且希望各个任务执行完后,能立即拿着结果去做下一件事;这里我推荐使用JDK8引入的组合式异步编程,下面是代码示例,它可以通过ofAll将多个异步任务一起同步尝试获取执行结果、可以使用JDK自带的forkJoinPool也可以自己制定线程池,并且可以通过提供的thenApply/thenRun/thenAccept等API灵活的实现流式异步编程,结合lambda表达式,代码清晰简洁明了;
- /**
- * 测试CompletableFuture获取异步结果的顺序及异步任务执行时间
- */
- @SneakyThrows
- private static void testCompletableFuture() {
- List
> futureList = Lists.newArrayList(); - // 记录A/B/C的任务完成时间
- List
taskFinshTimeList = Lists.newArrayList(); - AtomicLong finishA = new AtomicLong();
- AtomicLong finishB = new AtomicLong();
- AtomicLong finishC = new AtomicLong();
- // A cost 10s
- final CompletableFuture
completableFutureA = CompletableFuture.supplyAsync(() -> { - log.warn("exec A start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(10);
- } catch (InterruptedException e) {
- log.error("A InterruptedException occur!");
- }
- finishA.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishA);
- log.warn("exec A finish cost=[{}]ms", finishA.get() - start);
- return "A";
- }, executorService);
-
- futureList.add(completableFutureA);
-
- // B cost 3s
- final CompletableFuture
completableFutureB = CompletableFuture.supplyAsync(() -> { - log.warn("exec B start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- log.error("B InterruptedException occur!");
- }
- finishB.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishB);
- log.warn("exec B finish cost=[{}]ms", finishB.get() - start);
- return "B";
- }, executorService);
- futureList.add(completableFutureB);
-
- // C cost 7s
- final CompletableFuture
completableFutureC = CompletableFuture.supplyAsync(() -> { - log.warn("exec C start");
- final long start = System.currentTimeMillis();
- try {
- TimeUnit.SECONDS.sleep(7);
- } catch (InterruptedException e) {
- log.error("C InterruptedException occur!");
- }
- finishC.set(System.currentTimeMillis());
- taskFinshTimeList.add(finishC);
- log.warn("exec C finish cost=[{}]ms", finishC.get() - start);
- return "C";
- }, executorService);
- futureList.add(completableFutureC);
-
- // 同步获取结果
- // AtomicInteger taskIndex = new AtomicInteger();
- // futureList.forEach(future -> {
- // try {
- // // 执行完取异步任务的结果不用阻塞 执行完立即去做下一件事 并且可以组合多个异步任务
- // future
- // // 第二件事 有返回值
- // .thenApply(result -> {
- // log.warn("sync get result, then do next task using [result={}], waiting [{}]ms after step.1 task finish.",
- // result, System.currentTimeMillis() - taskFinshTimeList.get(taskIndex.getAndIncrement()).get());
- // return "secondary_" + result;
- // })
- // // 第三件事 无返回值
- // .thenAccept(secondaryResult -> {
- // log.warn("sync get secondary result, then do next task using [secondary result={}]", secondaryResult);
- // });
- // } catch (Exception e) {
- // log.error("future#get error occur!");
- // }
- // });
-
- AtomicInteger index = new AtomicInteger();
- CompletableFuture[] array = new CompletableFuture[futureList.size()];
- futureList.forEach(completableFuture -> array[index.getAndIncrement()] = completableFuture);
- // 获取最早得到结果的那个异步任务的返回值 其他的异步任务还在执行不会中断
- final Object anyResult = CompletableFuture.anyOf(array).get();
- log.warn("anyResult={}", anyResult);
更多的示例可以参考之前写的这篇文章《Java8 实战》笔记——4.CompletableFuture-组合式异步编程;
参考: