我先直接分享一下我是如何使用CompletableFuture的吧
- // 下载文件总数,初始化
- List<Integer> resultList = new ArrayList<>(1000);
- ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
- IntStream.range(0,1000).forEach(resultList::add);
-
- public List<R> sendAsyncBatch(List<P> list, Executor executor, TaskLoader<R,P> loader) {
-
- List<R> resultList = new CopyOnWriteArrayList<>();
- if (CollectionUtils.isNotEmpty(list)) {
- Executor finalExecutor = executor;
- // 将任务拆分分成每50个为一个任务
- CollUtil.split(list, 50)
- .forEach(tempList -> {
- CompletableFuture[] completableFutures = tempList.stream()
- .map(p -> CompletableFuture.supplyAsync(() -> {
- try {
- return loader.load(p);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- }, finalExecutor)
- .handle((result, throwable) -> {
- if (Objects.nonNull(throwable)) {
- //log.error("async error:{}", throwable.getMessage());
- } else if (Objects.nonNull(result)) {
- //log.info("async success:{}", result);
- } else {
- //log.error("async result is null");
- }
- return result;
- }).whenComplete((r, ex) -> {
- if (Objects.nonNull(r)) {
- resultList.add((R) r);
- }
- })
- ).toArray(CompletableFuture[]::new);
- CompletableFuture.allOf(completableFutures).join();
- System.out.println(resultList.size());
- });
- }
- return resultList;
- }
-
- // 具体业务逻辑实现接口
- @FunctionalInterface
- public interface TaskLoader
{ -
- T load(P p) throws InterruptedException;
- }
-
- // 自定义启动器
- ExecutorService executorService = BaseThreadPoolExector.queueExecutor(new ArrayBlockingQueue<>(5));
- AsyncTask<Integer, Integer> asyncTask = new AsyncTask();
-
- List<Integer> list = asyncTask.sendAsyncBatch(resultList, executorService, new TaskLoadImpl());
-
我先说一下,为什么要CountDownLatch替换掉
但是他也是有缺点的,我个人感觉他的API有点多,看的时候让人眼花。
短短十几行的代码,看到了很多API supplyAsync、handle、whenComplete、allOf
之后我们还会用到runAsync、 thenApply、thenCompose等等其他的。
异步编程,利用多线程优化性能这个核心方案得以实施的基础
他的目的也很简单,同一个CPU上执行几个松耦合的任务,充分利用CPU核数,实现最大化吞吐量,避免因为阻塞造成等待时间过长;
我们还需要特别的注意这两个概念不能混淆
并发:在一个CPU上串行执行
并行:多个CPU上同时执行任务
CompletableFuture主要继承了Future接口,但是他比Future接口丰富的很多
- // 取消
- boolean cancel(boolean mayInterruptIfRunning);
-
- // 判断是否取消
- boolean isCancelled();
-
- //是否异步计算是否已经结束
- boolean isDone();
-
- // 获取计算结果
- V get() throws InterruptedException, ExecutionException;
-
- // 设置最长计算时间,返回计算结果
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
-


可以看到Future接口的局限性,主要是用起来不省事 举个例子:A线程执行完之后通知B线程执行
- ExecutorService executorService = BaseThreadPoolExector.calculateExecutor();
- Future<String> futureA = executorService.submit(() -> Thread.currentThread().getName());
- System.out.println(futureA.get());
- if (futureA.isDone()){
- Future<String> futureB = executorService.submit(() -> Thread.currentThread().getName());
- System.out.println(futureB.get());
- }
- executorService.shutdown();
- 、
这里我们就需要查询futureA.isDone()结果,然后再去执行B线程的业务
而 CompletableFuture 操作起来就便捷很多了
- CompletableFuture<String> completableFuture = CompletableFuture
- .supplyAsync(() -> Thread.currentThread().getName(), executorService)
- .thenApply(s -> Thread.currentThread().getName());
- System.out.println(completableFuture.get());
-
- 准备执行
- 计划执行
- supplyAsync result pool-1-thread-1, thenApply result main
- 线程退出
- 复制代码
supplyAsync执行完成之后,再去执行thenApply
没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
- CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
- new Thread(() ->{
- try {
- int kk = 10/0;
- System.out.println(kk);
- }catch (Exception ex){
- //ex.printStackTrace();
- completableFuture.completeExceptionally(ex);
- }
-
- }).start();
- try {
- System.out.println(completableFuture.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
-
注意到catch里面的completeExceptionally函数了吧,
这个主要的作用就是为了抛出异常,
如果缺少了他,就会造成completableFuture.get()一直处于等待造成阻塞,
与此同时,没有为我们抛出异常信息。
所以CompletableFuture的API优美之处又要体现出来了
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
- int kk = 10 / 0;
- return kk;
- }).handle((result, throwable) -> {
- System.out.println(result);
- System.out.println(throwable.getMessage());
- return result;
- }).whenComplete((result ,throwable) -> System.out.println(result));
-
supplyAsync配合着 handle 和 whenComplete,将异常和结果进行处理.
handle 和 whenComplete的区别
- whenComplete
- public CompletableFuture<T> whenComplete(
- BiConsumer<? super T, ? super Throwable> action) {
- return uniWhenCompleteStage(null, action);
- }
- handle
- public <U> CompletableFuture<U> handle(
- BiFunction<? super T, Throwable, ? extends U> fn) {
- return uniHandleStage(null, fn);
- }
-
whenComplete是BiConsumer也就是直接消费不返回值,
handle是BiFunction也就是需要返回值
欲善其功,必先利其器
我们主要从这三种关系下手去了解和使用API 涉及接口
- CompletionStage
thenApply(fn); - CompletionStage
thenApplyAsync(fn); - CompletionStage
thenAccept(consumer); - CompletionStage
thenAcceptAsync(consumer); - CompletionStage
thenRun(action); - CompletionStage
thenRunAsync(action); - CompletionStage
thenCompose(fn); - CompletionStage
thenComposeAsync(fn); -
thenApply函数里参数入参Function super T,? extends U> fn,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply函数出参的是CompletionStage。
thenAccept类型函数入参Consumer super T> action是一个消费类型的,回参是CompletionStage所以thenAccept类型函数不会有返回值。
thenRun函数入参Runnable action,回参CompletionStage,所以既不能接收参数也不支持返回值。
thenCombine函数入参CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn,回参CompletableFuture是支持返回值的,他的作用主要使用BiFunction处理两个阶段的结果
我们只需要注意他的入参、回参和函数后缀就能够区分出他们的不同
- CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{
- //int kk = 10/0;
- return Thread.currentThread().getName() + ":小郭";
- },executorService).thenApply(s -> {
- return s + "拿茶叶";
- }).thenApply(a ->{
- return a + ",泡茶去";
- }).handle((result, ex) ->{
- if (ex != null){
- System.out.println(ex.getMessage());
- }
- return result;
- }).whenComplete((r, ex) ->{
- System.out.println(r);
- });
- task1.join();
-
执行结果:
- 准备执行
- 计划执行
- pool-1-thread-1:小郭拿茶叶,泡茶去
-
可以看到,是按照之上而下的顺序去执行的supplyAsync、thenApply、thenApply 如果第二阶段任务没有拿到第一阶段的结果,他就会等待
- CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{
- int t = new Random().nextInt(30);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("task1=" + t);
- return t;
- });
- CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{
- int t = new Random().nextInt(30);
- try {
- Thread.sleep(t);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("task2=" + t);
- return t;
- });
- CompletableFuture<Integer> task3 = task1.thenCombineAsync(task2, Integer::sum);
- task3.join();
-
等待task1和task2执行完成,task再进行处理
执行结果
- task1=1
- task2=3
- 4
-
- CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{
- int t = new Random().nextInt(5);
- try {
- Thread.sleep(t * 1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("task1=" + t);
- return t;
- });
- CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{
- int t = new Random().nextInt(5);
- try {
- Thread.sleep(t * 1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("task2=" + t);
- return t;
- });
- CompletableFuture<Integer> task3 = task1.applyToEither(task2, s ->s);
- task3.join();
-
谁先执行完先输出谁,如果相同时间执行完,则一起数据
执行结果
- 我快我先来 task2=2
- 我快我先来 task1=2
- 2
-
- 我快我先来 task2=0
- 0
-
为了更好的发挥出CompletableFuture,需要采用定制的执行器
那这两个如何选择?
现在回过头看一下,我上面的改造方法,是不是就感觉清晰了许多,不足的地方大家提出来