就是将回调方法转为CompletableFuture,然后再依赖CompletableFure的能力进行调用编排
- @FunctionalInterface
- public interface ThriftAsyncCall {
- void invoke() throws TException;
- }
- /**
- * OctoThriftCallback 为thrift回调方法
- * ThriftAsyncCall 为自定义函数,用来表示一次thrift调用(定义如上)
- */
- public static
CompletableFuture toCompletableFuture(final OctoThriftCallback,T> callback , ThriftAsyncCall thriftCall) { - //新建一个未完成的CompletableFuture
- CompletableFuture
resultFuture = new CompletableFuture<>(); - //监听回调的完成,并且与CompletableFuture同步状态
- callback.addObserver(new OctoObserver
() { - @Override
- public void onSuccess(T t) {
- resultFuture.complete(t);
- }
- @Override
- public void onFailure(Throwable throwable) {
- resultFuture.completeExceptionally(throwable);
- }
- });
- if (thriftCall != null) {
- try {
- thriftCall.invoke();
- } catch (TException e) {
- resultFuture.completeExceptionally(e);
- }
- }
- return resultFuture;
- }
线程池使用
- ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
- CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { - System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
- //业务操作
- return "";
- }, threadPool1);
- //此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
- future1.thenApply(value -> {
- System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
- return value + "1";
- });
- //使用ForkJoinPool中的共用线程池CommonPool
- future1.thenApplyAsync(value -> {
- //do something
- return value + "1";
- });
- //使用指定线程池
- future1.thenApplyAsync(value -> {
- //do something
- return value + "1";
- }, threadPool1);
线程池循环引用会导致死锁
- public Object doGet() {
- ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
- CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
- //do sth
- return CompletableFuture.supplyAsync(() -> {
- System.out.println("child");
- return "child";
- }, threadPool1).join();//子任务
- }, threadPool1);
- return cf1.join();
- }
如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。
为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞
服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应
自定义的工具类ExceptionUtils
- public class ExceptionUtils {
- public static Throwable extractRealException(Throwable throwable) {
- //这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
- if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
- if (throwable.getCause() != null) {
- return throwable.getCause();
- }
- }
- return throwable;
- }
- }
-
-
- remarkResultFuture
- .thenApply(result -> {//这里增加了一个回调方法thenApply,如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装
- //这里是一些业务操作
- })
- .exceptionally(err -> {//通过exceptionally 捕获异常,这里的err已经被thenApply包装过,因此需要通过Throwable.getCause()提取异常
- log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
- return 0;
- });
大部分异常会封装成CompletionException后抛出,真正的异常存储在cause属性中,因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常
常用工具类代码
- /**
- * 设置CF状态为失败
- */
- public static
CompletableFuture failed(Throwable ex) { - CompletableFuture
completableFuture = new CompletableFuture<>(); - completableFuture.completeExceptionally(ex);
- return completableFuture;
- }
- /**
- * 设置CF状态为成功
- */
- public static
CompletableFuture success(T result) { - CompletableFuture
completableFuture = new CompletableFuture<>(); - completableFuture.complete(result);
- return completableFuture;
- }
- /**
- * 将List
> 转为 CompletableFuture>
- */
- public static
CompletableFuture> sequence(Collection> completableFutures) {
- return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream()
- .map(CompletableFuture::join)
- .collect(Collectors.toList())
- );
- }
- /**
- * 将List
>> 转为 CompletableFuture>
- * 多用于分页查询的场景
- */
- public static
CompletableFuture> sequenceList(Collection>> completableFutures) {
- return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream()
- .flatMap( listFuture -> listFuture.join().stream())
- .collect(Collectors.toList())
- );
- }
- /*
- * 将List
>> 转为 CompletableFuture - * @Param mergeFunction 自定义key冲突时的merge策略
- */
- public static
CompletableFuture - Collection
>> completableFutures, BinaryOperator mergeFunction) { - return CompletableFuture
- .allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
- .flatMap(map -> map.entrySet().stream())
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));
- }
- /**
- * 将List
> 转为 CompletableFuture>,并过滤调null值
- */
- public static
CompletableFuture> sequenceNonNull(Collection> completableFutures) {
- return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream()
- .map(CompletableFuture::join)
- .filter(e -> e != null)
- .collect(Collectors.toList())
- );
- }
- /**
- * 将List
>> 转为 CompletableFuture>,并过滤调null值
- * 多用于分页查询的场景
- */
- public static
CompletableFuture> sequenceListNonNull(Collection>> completableFutures) {
- return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream()
- .flatMap( listFuture -> listFuture.join().stream().filter(e -> e != null))
- .collect(Collectors.toList())
- );
- }
- /**
- * 将List
>> 转为 CompletableFuture - * @Param filterFunction 自定义过滤策略
- */
- public static
CompletableFuture> sequence(Collection> completableFutures,
- Predicate super T> filterFunction) {
- return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream()
- .map(CompletableFuture::join)
- .filter(filterFunction)
- .collect(Collectors.toList())
- );
- }
- /**
- * 将List
>> 转为 CompletableFuture>
- * @Param filterFunction 自定义过滤策略
- */
- public static
CompletableFuture> sequenceList(Collection>> completableFutures,
- Predicate super T> filterFunction) {
- return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream()
- .flatMap( listFuture -> listFuture.join().stream().filter(filterFunction))
- .collect(Collectors.toList())
- );
- }
- /**
- * 将CompletableFuture
- */
- public static
CompletableFuture> sequenceMap( - Collection
>> completableFutures) { - return CompletableFuture
- .allOf(completableFutures.toArray(new CompletableFuture>[0]))
- .thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
- .flatMap(map -> map.entrySet().stream())
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b)));
- }}