Future代表异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法。只有在计算完成后,才能使用方法get检索结果,如有必要,将其阻塞,直到准备就绪。取消是通过取消方法执行的。还提供了其他方法来确定任务是否正常完成或被取消。
//等待异步任务完成,然后检索其结果
V get() throws InterruptedException, ExecutionException;
//最多等待给定的时间以完成计算,然后检索其结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
//如果此任务已完成,则返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true
boolean isDone();
private static final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "test-" + i++);
}
});
public static void demo01() {
log.info("创建异步任务");
Future submit = executor.submit(new Callable() {
@Override
public String call() {
String result = "fail";
try {
log.info("开始执行异步任务");
// 执行任务耗时
Thread.sleep(10000);
result = "success";
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
});
try {
String result = submit.get();
log.info("获取异步任务结果 " + result);
} catch (InterruptedException e) {
System.out.println("中断异常");
} catch (ExecutionException e) {
System.out.println("执行异常");
}
log.info("Future的get方法,会使当前线程阻塞");
}
public static void demo02() throws InterruptedException, ExecutionException {
log.info("创建异步任务");
Future submit = executor.submit(new Callable() {
@Override
public String call() {
String result = "fail";
try {
log.info("开始执行异步任务");
// 执行任务耗时
Thread.sleep(10000);
result = "success";
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
});
log.info("轮询调用isDone方法查询异步任务是否完成");
while (true) {
if (submit.isDone()) {
String result = submit.get();
log.info(result);
break;
} else {
log.info("异步任务还未完成,先干点别的事");
Thread.sleep(1000);
}
}
log.info("Future的get方法,会使当前线程阻塞");
}
使用Future,并不能实现真正的异步,要么需要阻塞的获取结果,要么不断的轮询
CompletableFuture实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
//创建带返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定
public static CompletableFuture supplyAsync(Supplier supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
//创建无返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定
public static CompletableFuture runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
//如果以任何方式完成,则返回true:正常、异常或通过取消
public boolean isDone() {
return result != null;
}
//等待此任务完成,然后返回其结果
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
//最多等待给定的时间,以完成此任务,然后返回其结果
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
//如果任务完成则返回结果集,否则返回给定的valueIfAbsent
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
在流式处理中,等待上层任务正常执行完成后,再执行回调方法;
thenApply:上层任务的结果值作为回调方法的入参值,该回调方法有返回值
thenAccept:上层任务的结果值作为回调方法的入参值,该回调方法没有返回值
thenRun:没有入参也没有返回值的回调方法
public CompletableFuture thenApply(Function super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
public CompletableFuture thenAccept(Consumer super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture thenAcceptAsync(Consumer super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture thenAcceptAsync(Consumer super T> action, Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
public CompletableFuture thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture thenRunAsync(Runnable action, Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
public static void demo03() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
log.info("执行异步任务");
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}
}, fixedThreadPool).thenApplyAsync((result) -> {
log.info("上层任务结果: " + result);
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "over";
}, fixedThreadPool);
log.info("最终结果 = " + finalResult.get());
}
如果上层任务抛异常则不会进入回调方法中
public static void demo03() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
log.info("执行异步任务");
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
//有异常
if (true) throw new RuntimeException("异常");
return "success";
}
}, fixedThreadPool).thenApplyAsync((result) -> {
log.info("上层任务结果: " + result);
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "over";
}, fixedThreadPool);
//异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
log.info("最终结果 = " + finalResult.get());
}
上层任务执行中,若抛出异常可被该方法接收,异常即该方法的参数;
若无异常,不会进入该方法并将上层的结果值继续下传。
public static void demo03() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
log.info("执行异步任务");
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
//有异常
if (true) throw new RuntimeException("异常");
return "success";
}
}, fixedThreadPool).exceptionally((exception) -> {
try {
log.info("异常处理 " + exception);
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "exception";
}).thenApplyAsync((result) -> {
log.info("上层任务结果: " + result);
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "over";
}, fixedThreadPool);
//异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
log.info("最终结果 = " + finalResult.get());
}
异常情况
正常情况
接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法无返回值
public CompletableFuture whenComplete(
BiConsumer super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture whenCompleteAsync(
BiConsumer super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture whenCompleteAsync(
BiConsumer super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public static void demo04() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
log.info("执行异步任务");
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
//有异常
if (true) throw new RuntimeException("异常");
return "success";
}
}, fixedThreadPool).whenCompleteAsync((result, exception) -> {
if (exception == null) {
log.info("上层任务无异常,获取到上层结果为:" + result);
} else {
log.info("上层任务有异常,获取到上层结果为:" + result);
}
}, fixedThreadPool);
//异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
log.info("最终结果 = " + finalResult.get());
}
无异常
有异常
接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法有返回值
public CompletableFuture handle(
BiFunction super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public CompletableFuture handleAsync(
BiFunction super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public CompletableFuture handleAsync(
BiFunction super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
public static void demo04() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
log.info("执行异步任务");
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
//有异常
//if (true) throw new RuntimeException("异常");
return "success";
}
}, fixedThreadPool).handleAsync((result, exception) -> {
if (exception == null) {
log.info("上层任务无异常,获取到上层结果为:" + result);
} else {
log.info("上层任务有异常,获取到上层结果为:" + result, exception);
}
return "handle " + result;
}, fixedThreadPool);
//异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
log.info("最终结果 = " + finalResult.get());
}
无异常
有异常
将两个CompletableFuture组合起来,当这两个future都正常执行完了才会执行回调任务
thenCombine:2个future的返回值作为回调方法的入参值,该回调方法有返回值
thenAcceptBoth:2个future的返回值作为回调方法的入参值,该回调方法没有返回值
runAfterBoth:没有入参也没有返回值
public CompletableFuture thenCombine(
CompletionStage extends U> other,
BiFunction super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public CompletableFuture thenCombineAsync(
CompletionStage extends U> other,
BiFunction super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public CompletableFuture thenCombineAsync(
CompletionStage extends U> other,
BiFunction super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
public CompletableFuture thenAcceptBoth(
CompletionStage extends U> other,
BiConsumer super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public CompletableFuture thenAcceptBothAsync(
CompletionStage extends U> other,
BiConsumer super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public CompletableFuture thenAcceptBothAsync(
CompletionStage extends U> other,
BiConsumer super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture runAfterBoth(CompletionStage> other, Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture runAfterBothAsync(CompletionStage> other, Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture runAfterBothAsync(CompletionStage> other, Runnable action, Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
public static void demo05() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int i = 0;
try {
log.info("开始执行异步任务");
Thread.sleep((long) (Math.random() * 5000));
i = 1;
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}, fixedThreadPool);
CompletableFuture supplyAsync2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int i = 0;
try {
log.info("开始执行异步任务");
Thread.sleep((long) (Math.random() * 8000));
i = 2;
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}, fixedThreadPool);
CompletableFuture thenCombineAsync = supplyAsync.thenCombineAsync(supplyAsync2, (a, b) -> {
log.info("a = " + a + ", b = " + b);
return a + b;
}, fixedThreadPool);
log.info("thenCombineAsync = " + thenCombineAsync.get());
}
其中任意一个有异常都会导致thenCombineAsync方法不执行
将两个CompletableFuture组合起来,只要有一个future正常执行完了就可以执行回调任务
applyToEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法有返回值
acceptEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法没有返回值
runAfterEither:只要有任务执行完就调用回调方法
public CompletableFuture applyToEither(
CompletionStage extends T> other, Function super T, U> fn) {
return orApplyStage(null, other, fn);
}
public CompletableFuture applyToEitherAsync(
CompletionStage extends T> other, Function super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public CompletableFuture applyToEitherAsync(
CompletionStage extends T> other, Function super T, U> fn, Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
public CompletableFuture acceptEither(
CompletionStage extends T> other, Consumer super T> action) {
return orAcceptStage(null, other, action);
}
public CompletableFuture acceptEitherAsync(
CompletionStage extends T> other, Consumer super T> action) {
return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture acceptEitherAsync(
CompletionStage extends T> other, Consumer super T> action, Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture runAfterEither(CompletionStage> other,Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture runAfterEitherAsync(CompletionStage> other,Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture runAfterEitherAsync(CompletionStage> other,Runnable action,Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
public static void demo06() throws ExecutionException, InterruptedException {
log.info("创建异步任务");
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int i = 0;
try {
log.info("执行异步任务");
Thread.sleep((long) (Math.random() * 5000));
i = 1;
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}, fixedThreadPool);
CompletableFuture supplyAsync2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int i = 0;
try {
log.info("执行异步任务");
Thread.sleep((long) (Math.random() * 5000));
i = 2;
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}, fixedThreadPool);
CompletableFuture thenCombineAsync = supplyAsync.applyToEitherAsync(supplyAsync2, (result) -> {
log.info("result " + result);
return 3;
}, fixedThreadPool);
log.info("final result = " + thenCombineAsync.get());
}
任意一个任务有异常,都不会进入applyToEitherAsync方法
基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作
package com.yzm.thread.async;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync // 开启异步调用功能,即使@Async注解生效
@Slf4j
public class AsyncConfig implements AsyncConfigurer {
@Bean(name = "default_async_pool", destroyMethod = "shutdown")
public ThreadPoolTaskExecutor defaultAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置线程池前缀:方便排查
executor.setThreadNamePrefix("default-async-");
// 设置线程池的大小
executor.setCorePoolSize(10);
// 设置线程池的最大值
executor.setMaxPoolSize(15);
// 设置线程池的队列大小
executor.setQueueCapacity(250);
// 设置线程最大空闲时间,单位:秒
executor.setKeepAliveSeconds(3000);
// 饱和策略
// AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
// CallerRunsPolicy:若已达到待处理队列长度,将由主线程直接处理请求
// DiscardOldestPolicy:抛弃旧的任务;会导致被丢弃的任务无法再次被执行
// DiscardPolicy:抛弃当前任务;会导致被丢弃的任务无法再次被执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return executor;
}
@Bean(name = "another_async_pool", destroyMethod = "shutdown")
public ThreadPoolTaskExecutor anotherAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("another-task-");
executor.setCorePoolSize(3);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(5);
executor.setKeepAliveSeconds(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return executor;
}
/**
* 自定义异步线程池,若不重写,则使用默认的
*/
@Override
public Executor getAsyncExecutor() {
return defaultAsyncPool();
}
/**
* 1.无参无返回值方法
* 2.有参无返回值方法
* 返回值为void的, 通过IllegalArgumentException异常, AsyncUncaughtExceptionHandler处理异常
* 3.有参有返回值方法
* 返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
* 或者在调用方在调用Future.get时捕获异常进行处理
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("正在处理无返回值的@Async异步调用方法");
return (throwable, method, objects) -> {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param : objects) {
log.info("Parameter value - " + param);
}
};
}
}
package com.yzm.thread.async;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.Future;
@Slf4j
@Component
public class AsyncService {
/**
* 1.无参无返回值方法
* 最简单的异步调用,返回值为void
*/
@Async
public void async() {
log.info("无参无返回值方法,通过观察线程名称以便查看效果");
// int a = 1 / 0;
}
/**
* 2.有参无返回值方法
* 指定线程池
*
* @param i 传入参数
*/
@Async("another_async_pool")
public void async(int i) {
log.info("有参无返回值方法, 参数={}", i);
}
/**
* 3.有参有返回值方法
*
* @param i 传入参数
* @return Future
*/
@Async
public Future asyncReturn(int i) throws InterruptedException {
log.info("有参有返回值方法, 参数={}", i);
// int a = 1 / 0;
Thread.sleep(100);
return new AsyncResult("success:" + i);
}
/**
* @Async 必须不同类间调用:
*/
public void D() {
log.info("在同类下调用 @Async 方法是同步执行的");
async();
}
}
@Component
public class AsyncDemo {
private final AsyncService asyncService;
public AsyncDemo(AsyncService asyncService) {
this.asyncService = asyncService;
}
@PostConstruct
public void demo() {
asyncA();
}
public void asyncA() {
asyncService.async();
}
}
AsyncService类
/**
* 2.有参无返回值方法
* 指定线程池
*
* @param i 传入参数
*/
@Async("another_async_pool")
public void async(int i) {
log.info("有参无返回值方法, 参数={}", i);
}
AsyncDemo类
@PostConstruct
public void demo() {
// asyncA();
asyncB(1);
}
public void asyncA() {
asyncService.async();
}
public void asyncB(int i) {
asyncService.async(i);
}
public void asyncC(int i) {
try {
Future future = asyncService.asyncReturn(i);
// 这里使用了循环判断,等待获取结果信息
while (true) {
// 判断是否执行完毕
if (future.isDone()) {
System.out.println("执行完毕,结果为:" + future.get());
break;
}
System.out.println("还未执行完毕,请稍等。。。");
Thread.sleep(1000);
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("异步调用失败");
e.printStackTrace();
}
}
AsyncService类
/**
* @Async 必须不同类间调用:
*/
public void D() {
log.info("在同类下调用 @Async 方法是同步执行的");
// 调用本类的异步方法
async();
}
AsyncDemo类
public void asyncD() {
asyncService.D();
}
AsyncConfig类
// 可处理无返回值的异步方法异常
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("正在处理无返回值的@Async异步调用方法");
return (throwable, method, objects) -> {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param : objects) {
log.info("Parameter value - " + param);
}
};
}
AsyncService类
/**
* 1.无参无返回值方法
* 最简单的异步调用,返回值为void
*/
@Async
public void async() {
log.info("无参无返回值方法,通过观察线程名称以便查看效果");
int a = 1 / 0;
}
AsyncDemo类
@PostConstruct
public void demo() {
asyncA();
// asyncB(1);
// asyncC(11);
// asyncD();
}
public void asyncA() {
asyncService.async();
}
有返回值的异步方法异常,需要手动try{}catch(){}处理
在@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.
例如:
方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
方法B,使用了@Async来标注, B中调用了C,C使用@Transactional做了标注,则可实现事务控制的目的。
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦