Future 接口
Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模
了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在
Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,
不再需要呆呆等待耗时的操作完成。
- @Test
- public void testFuture() {
- ExecutorService executor = Executors.newCachedThreadPool();
- Future
future = executor.submit(new Callable() { - public Double call() {
- return doSomeLongComputation();
- }});
- doSomethingElse();
- try {
- Double result = future.get(1, TimeUnit.SECONDS);
- System.out.println(result);
- } catch (ExecutionException ee) {
- // 计算抛出一个异常
- } catch (InterruptedException ie) {
- // 当前线程在等待过程中被中断
- } catch (TimeoutException te) {
- // 在Future对象完成之前超过已过期
- }
- }
-
- private Double doSomeLongComputation() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 0.0;
- }
-
- private void doSomethingElse() {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("doSomethingElse");
- }
Future 接口的局限性
通过例子,我们知道Future接口提供了方法来检测异步计算是否已经结束(使用
isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以让你编写简洁
的并发代码。比如,我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时
间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都
完成后,将计算的结果与另一个查询操作结果合并”。但是,使用Future中提供的方法完成这样
的操作又是另外一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些。
1: 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第
一个的结果。
2: 等待Future集合中的所有任务都完成。
3: 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同
一个值),并返回它的结果。
4: 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
5: 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future
计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
使用 CompletableFuture 构建异步应用
- @Test
- public void testGetPriceAsync() {
- CompletableFuture
futurePrice = new CompletableFuture<>(); - new Thread( () -> {
- double price = calculatePrice("Apple");
- futurePrice.complete(price);
- }).start();
- doSomethingElse();
- try {
- Double result = futurePrice.get();
- System.out.println(result);
- } catch (ExecutionException | InterruptedException ee) {
- // 计算抛出一个异常
- }
- }
-
- private Double calculatePrice(String product) {
- Double value = 0.0;
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- switch (product) {
- case "Apple":
- value = 10.0;
- break;
- case "Banana":
- value = 5.0;
- break;
- default:
- break;
- }
- return value;
- }
使用 CompletableFuture 发起异步请求
- @Test
- public void TestFindPrice() {
- List<String> list = Arrays.asList("Apple", "Banana");
- List<CompletableFuture<String>> priceFuture =
- list.stream()
- .map(a -> CompletableFuture.supplyAsync(
- () -> a + " price is:" + calculatePrice(a)))
- .collect(Collectors.toList());
- List<String> resultList = priceFuture.stream()
- .map(CompletableFuture::join)
- .collect(Collectors.toList());
-
- resultList.forEach(System.out::println);
- }
构造同步和异步操作
- @Test
- public void testFindPrice2() {
- List<String> list = Arrays.asList("Apple", "Banana");
- ExecutorService executor = Executors.newCachedThreadPool();
- List<CompletableFuture<String>> priceFutures =
- list.stream()
- .map(a -> CompletableFuture.supplyAsync(
- () -> calculatePrice(a), executor))
- .map(future -> future.thenApply(Quote::parse))
- .map(future -> future.thenCompose(quote ->
- CompletableFuture.supplyAsync(
- () -> Discount.applyDiscount(quote), executor)))
- .collect(Collectors.toList());
- List<String> resultList = priceFutures.stream()
- .map(CompletableFuture::join)
- .collect(Collectors.toList());
- resultList.forEach(System.out::println);
- }
-
- static class Quote {
- public static Double parse(Double val) {
- return val;
- }
- }
-
- static class Discount {
- public static String applyDiscount(Double val) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "price is:" + val;
- }
- }
将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖
- @Test
- public void testFindPrice3() {
- Future<Double> futurePrice =
- CompletableFuture.supplyAsync(() -> calculatePrice("Apple"))
- .thenCombine(
- CompletableFuture.supplyAsync(
- () -> getRate()),
- (price, rate) -> price * rate
- );
- try {
- Double result = futurePrice.get();
- System.out.println(result);
- } catch (ExecutionException | InterruptedException ee) {
- // 计算抛出一个异常
- }
- }
-
- private Double getRate() {
- return 0.8;
- }