• Java异步实现的N种方式


    背景

    异步编程现在受到了越来越多的关注,尤其是在IO密集型的业务场景中,相比传统的同步开发模式,异步编程的优势越来越明显,本文介绍Java常见的实现方式;

    Future

    描述

    java.util.concurrent.Future是JDK5引入的,用来获取一个异步计算的结果。可以使用isDone方法检查计算是否完成,也可以使用get阻塞住调用线程,直到计算完成返回结果,使用cancel方法停止任务的执行。

    FutureTask.java是对Futre和Runnable最简单的实现,实现了run函数,所以可以直接执行,任务执行结束通过set()保存结果,setException()保存异常信息。通常配合executorService.submit()一起使用,ExecutorService中将任务包装成FutureTask执行execute();

    样例

    		@Test
    ????public?void?futureCallBackTest()?throws?InterruptedException,?ExecutionException?{
    ????????System.out.println(printThread("小明点餐"));
    ????????Future?future?=?executorService.submit(()?->?{
    ????????????System.out.println(printThread("厨师开始炒菜"));
    ????????????Thread.sleep(2000);
    ????????????System.out.println(printThread(?"厨师炒好菜"));
    ????????????return?"饭菜好了";
    ????????});
    
    ????????String?result?=?future.get();
    ????????executorService.shutdown();
    ????????System.out.println(printThread(result?+?",小明开始吃饭"));
    ????}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    运行结果

    优缺点

    • 能获得异步线程执行结果

    • 无法方便得知任务何时完成

    • 在主线程获得任务结果会导致主线程阻塞

    • 复杂一点的情况下,比如多个异步任务的场景,一个异步任务依赖上一个异步任务的执行结果,异步任务合并等,Future无法满足需求

    ListenableFuture

    描述

    Google并发包下的listenableFuture对Java原生的future做了扩展,顾名思义就是使用监听器模式实现的回调,所以叫可监听的future,通过addListener(Runnablelistener,Executorexecutor)方法添加回调任务。

    要使用listenableFuture还要结合MoreExecutor线程池,MoreExecutor是对Java原生线程池的封装,比如常用的MoreExecutors.listeningDecorator(threadPool);修改Java原生线程池的submit方法,封装了future返回listenableFuture。

    样例

    @Test
    ????public?void?listenableFutureTest()?throws?InterruptedException,?ExecutionException?{
    ????????System.out.println(printThread("小明点餐"));
    ????????ListeningExecutorService?listeningExecutorService?=?MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
    ????????ListenableFuture?listenableFuture?=?listeningExecutorService.submit(()?->?{
    ????????????System.out.println(printThread("厨师开始炒菜"));
    ????????????try?{
    ????????????????Thread.sleep(2000);
    ????????????}?catch?(InterruptedException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????????System.out.println(printThread(?"厨师炒好菜"));
    ????????????return?"饭菜好了";
    ????????});
    
    ????????Futures.addCallback(listenableFuture,?new?FutureCallback()?{
    ????????????@Override
    ????????????public?void?onSuccess(@Nullable?String?s)?{
    ????????????????System.out.println(printThread(s?+?",小明开始吃饭"));
    ????????????}
    
    ????????????@Override
    ????????????public?void?onFailure(Throwable?throwable)?{
    ????????????????System.out.println(printThread(?throwable.getMessage()));
    ????????????}
    ????????},?executorService);
    
    ????????System.out.println(printThread(?"小明开始玩游戏"));
    ????????try?{
    ????????????Thread.sleep(3000);
    ????????}?catch?(InterruptedException?e)?{
    ????????????e.printStackTrace();
    ????????}
    ????????System.out.println(printThread("小明结束玩游戏"));
    
    
    ????????listenableFuture.get();
    ????????listeningExecutorService.shutdown();
    ????????executorService.shutdown();
    ????}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    运行结果

    这里的运行结果:小明玩游戏和小明吃饭放在了2个线程,没有阻塞等待。

    优缺点

    充分利用线程的时间片

    回调机制的最大问题是:CallbackHell(回调地狱)

    CallbackHell

    描述

    大量使用Callback机制,使应该是先后的业务逻辑在代码形式上表现为层层嵌套,这会导致代码难以理解和维护

    样例

    @Test
    ????public?void?listenableFutureCallbackHellTest()?throws?InterruptedException,?ExecutionException?{
    ????????System.out.println(printThread("小明点餐"));
    ????????ListeningExecutorService?listeningExecutorService?=?MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
    ????????ListenableFuture?listenableFuture?=?listeningExecutorService.submit(()?->?{
    ????????????System.out.println(printThread("厨师开始做菜"));
    ????????????try?{
    ????????????????Thread.sleep(2000);
    ????????????}?catch?(InterruptedException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????????return?"菜已装盘";
    ????????});
    ????????Futures.addCallback(listenableFuture,?new?FutureCallback()?{
    ????????????@Override
    ????????????public?void?onSuccess(@Nullable?String?s)?{
    ????????????????System.out.println(printThread(s?+?",小明开始吃饭"));
    ????????????????System.out.println(printThread(?"小明点了个饮料"));
    ????????????????ListenableFuture?listenableFuture1?=?listeningExecutorService.submit(()?->?{
    ????????????????????System.out.println(printThread("服务员拿饮料"));
    ????????????????????try?{
    ????????????????????????Thread.sleep(1000);
    ????????????????????}?catch?(InterruptedException?e)?{
    ????????????????????????e.printStackTrace();
    ????????????????????}
    ????????????????????return?"饮料好了";
    ????????????????});
    ????????????????Futures.addCallback(listenableFuture1,?new?FutureCallback()?{
    ????????????????????@Override
    ????????????????????public?void?onSuccess(@Nullable?String?s)?{
    ????????????????????????System.out.println(printThread(s?+?",小明开始喝饮料"));
    ????????????????????}
    
    ????????????????????@Override
    ????????????????????public?void?onFailure(Throwable?throwable)?{
    
    ????????????????????}
    ????????????????},?executorService);
    ????????????}
    
    ????????????@Override
    ????????????public?void?onFailure(Throwable?throwable)?{
    ????????????????System.out.println(printThread(?throwable.getMessage()));
    ????????????}
    ????????},?executorService);
    
    ????????System.out.println(printThread(?"小明开始玩游戏"));
    ????????try?{
    ????????????Thread.sleep(3000);
    ????????}?catch?(InterruptedException?e)?{
    ????????????e.printStackTrace();
    ????????}
    ????????System.out.println(printThread("小明结束玩游戏"));
    
    ????????listenableFuture.get();
    ????????listeningExecutorService.shutdown();
    ????????executorService.awaitTermination(10,?TimeUnit.SECONDS);
    ????????executorService.shutdown();
    ????}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    CompleteableFuture

    描述

    Java8新增的CompletableFuture类借鉴了GoogleGuava的ListenableFuture,它包含50多个方法,默认使用forkJoinPool线程池,提供了非常强大的Future扩展功能,可以帮助我们简化异步编程的复杂性,结合函数式编程,通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的多种方法,可以满足大部分异步回调场景。

    CompletableFuture可以用来以声明式语义构建创建异步任务的编排模式,它可以用于通过声明表示:

    • 将要执行一个异步任务;

    • 将要执行一个异步任务,它必须在一个前驱异步任务完成之后执行,其以前驱任务的输出作为自身的输入;

    • 将要执行一个异步任务,它必须在若干前驱异步任务中的(任意或全部)完成之后执行,其以全部(或任一)前驱任务的输出作为自身的输入;

    样例

    @Test
    ????public?void?completeableFutureTest()??{
    ????????System.out.println(printThread("小明点餐"));
    
    ????????CompletableFuture?completableFuture?=?CompletableFuture.supplyAsync(()?->?{
    ????????????System.out.println(printThread("厨师开始做菜"));
    ????????????try?{
    ????????????????Thread.sleep(2000);
    ????????????}?catch?(InterruptedException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????????System.out.println(printThread("厨师菜做好了"));
    ????????????return?"菜已装盘";
    ????????});
    
    ????????CompletableFuture?completableFuture1?=?CompletableFuture.runAsync(()?->?{
    ????????????System.out.println(printThread(?"小明开始玩游戏"));
    ????????????try?{
    ????????????????Thread.sleep(3000);
    ????????????}?catch?(InterruptedException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????????System.out.println(printThread("小明结束玩游戏"));
    ????????});
    
    ????????CompletableFuture?completableFuture2?=?completableFuture
    ????????????????.thenAcceptBoth(completableFuture1,(a,?b)?->?System.out.println(printThread(?a?+?",?小明开始吃饭,并点了饮料")))
    ????????????????.thenApplyAsync((b)?->?{
    ????????????????????System.out.println(printThread("服务员拿饮料"));
    ????????????????????try?{
    ????????????????????????Thread.sleep(1000);
    ????????????????????}?catch?(InterruptedException?e)?{
    ????????????????????????e.printStackTrace();
    ????????????????????}
    ????????????????????return?"饮料好了";
    ????????????????},executorService)
    ????????????????.thenAcceptAsync((s)?->?System.out.println(printThread(s?+?",小明开始喝饮料")));
    
    ????????completableFuture2.join();
    ????}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    方法介绍

    创建对象

    以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

    runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。

    supplyAsync方法以Supplier函数式接口类型为参数,CompletableFuture的计算结果类型为U。

    计算结果完成时的处理

    当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action;

    不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行;

    exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值;

    转换、消费

    一个传Function将CompletableFuture中的值转换成另一个值,一个传Consumer将CompletableFuture值消费;

    组合

    thenCombine用来复合另外一个CompletionStage的结果,两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序;

    thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,acceptEither和applyToEither方法是当任意一个CompletionStage完成的时候执行后续任务;

    辅助方法allOf和anyOf

    allOf方法是当所有的CompletableFuture都执行完后执行计算;

    anyOf方法是当任意一个CompletableFuture执行完后就会执行计算;

    Reactor

    描述

    Reactor框架是Pivotal公司(Spring家族公司)开发的,实现了ReactiveProgramming思想,符合ReactiveStreams规范的一项技术;

    Reactive

    反应式宣言-反应式宣言

    ReactiveStreams

    介绍

    官网-https://www.reactive-streams.org/?spm=a2c6h.12873639.0.0.edf277a6wQI9QB

    简介:

    ReactiveStreams是一个对于异步流处理且伴随非阻塞背压机制而提供的倡议规范;

    目标:

    控制异步边界的流数据交换(例如从一个线程池向另一个线程池传递数据),同时要确保接收端不被强迫 缓冲任意数量的数据,也就是利用背压(backpressure)模型调节线程间的队列;

    反应式编程的范式(接口规范),主要接口

    • Publisher

    • Subscriber

    • Subcription

    其中,Subcriber中便包含了上面表格提到的onNext、onError、onCompleted这三个方法。

    一个简单样例

    public?static?void?main(String[]?args)?throws?InterruptedException?{
    ????Flux.just(1,?2,?3,?4,?5)
    ????????????.subscribeOn(Schedulers.parallel())
    ????????????.subscribe(new?CoreSubscriber()?{
    ????????????????@Override
    ????????????????public?void?onSubscribe(Subscription?s)?{
    ????????????????????System.out.println(printThread("onSubscribe,?"?+?s.getClass().toString()));
    ????????????????????s.request(5);
    ????????????????}
    
    ????????????????@Override
    ????????????????public?void?onNext(Integer?integer)?{
    ????????????????????System.out.println(printThread("next:?"?+?integer));
    ????????????????}
    
    ????????????????@Override
    ????????????????public?void?onError(Throwable?t)?{
    
    ????????????????}
    
    ????????????????@Override
    ????????????????public?void?onComplete()?{
    ????????????????????System.out.println(printThread("complete"));
    ????????????????}
    ????????????});
    ????Thread.sleep(1000);
    }
    
    private?static?String?printThread(String?note)?{
    ????SimpleDateFormat?simpleDateFormat?=?new?SimpleDateFormat("hh:mm:ss");
    ????long?time?=?System.currentTimeMillis();
    ????Date?date?=?new?Date(time);
    ????return?Thread.currentThread().getName()?+?"?"?+?simpleDateFormat.format(date)?+?"?"?+?time?+?"?"?+?note;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    Reactor

    实现reactivestreams的类库-Reactor 3 Reference Guide

    相似的类库有RxJava2,JDK9Flow等

    Mono实现了org.reactivestreams.Publisher接口,代表0到1个元素的发布者。

    Flux同样实现了org.reactivestreams.Publisher接口,代表0到N个元素的发表者。

    Scheduler表示背后驱动反应式流的调度器,通常由各种线程池实现。

    @Test
    ????public?void?ReactorTest()?throws?InterruptedException?{
    ????????System.out.println(printThread("小明点餐"));
    ????????Mono?mono?=?Mono.fromSupplier(()?->?{
    ????????????System.out.println(printThread("厨师开始做菜"));
    ????????????try?{
    ????????????????Thread.sleep(2000);
    ????????????}?catch?(InterruptedException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????????System.out.println(printThread("厨师菜做好了"));
    ????????????return?"菜已装盘";
    ????????}).publishOn(Schedulers.parallel())
    ??????????.zipWith(Mono.fromSupplier(()?->?{
    ????????????System.out.println(printThread(?"小明开始玩游戏"));
    ????????????try?{
    ????????????????Thread.sleep(3000);
    ????????????}?catch?(InterruptedException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????????System.out.println(printThread("小明结束玩游戏"));
    ????????????return?"?";
    ????????})).doOnSuccess((tuple2)?->?System.out.println(printThread(?tuple2.getT1()?+?",?小明开始吃饭")));
    
    ????????mono.subscribe();
    
    ????????Thread.sleep(10000);
    ????}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    reactor操作函数

    Reactor 3 Reference Guide

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    整点猛料!啃完阿里最新版Java面试八股文,大厂面试轻松搞定,拿下offer不是事儿!(金九银十同样适用)
    SSM框架真没那么难,这份阿里大佬的进阶实战笔记真给讲透了!
    毕业季 新的开始
    WPF界面设计学习
    【 Vue 】Diff 算法上
    OpenCV计算机视觉学习(14)——浅谈常见图像后缀(png, jpg, bmp)的区别(opencv读取语义分割mask的坑)
    python安装(windows64简洁版)
    一文教你如何发挥好 TDengine Grafana 插件作用
    王杰qtday4
    web前端期末大作业——餐品后台管理系统(html+css+javascript)
  • 原文地址:https://blog.csdn.net/web18224617243/article/details/126107145