• 对产品实现汇率换算服务(将两个CompletableFuture对象整合起来,无论它们是否存在依赖)


    需求

    有一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。你可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。

    实现源码

    public class ExchangeService {
        public enum Money {
            USD(1.0), EUR(1.35387), GBP(1.69715), CAD(.92106), MXN(.07683);
            private final double rate;
            Money(double rate) {
                this.rate = rate;
            }
        }
        public static double getRate(Money source, Money destination) {
            return getRateWithDelay(source, destination);
        }
        private static double getRateWithDelay(Money source, Money destination) {
            Util.delay();
            return destination.rate / source.rate;
        }
    }
    
    public class BestPriceFinder {
        private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                                                       new Shop("LetsSaveBig"),
                                                       new Shop("MyFavoriteShop"),
                                                       new Shop("BuyItAll")                                                 
                                                       new Shop("ShopEasy"));
                                                       
        private final Executor executor = Executors.newFixedThreadPool(shops.size(), ExecuterThreadFactoryBuilder.build());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    实现方案

    你对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。但是,另一种比较常见的情况是,你需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务。你应该使用thenCombine方法。thenCombine方法也提供有一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

    方案1:

    public List<String> findPricesInUSD(String product) {
        List<CompletableFuture<Double>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            CompletableFuture<Double> futurePriceInUSD = CompletableFuture
                    .supplyAsync(() -> shop.getPrice(product), executor)
                    .thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD), executor), (price, rate) -> price * rate);
            priceFutures.add(futurePriceInUSD);
        }
        
        List<String> prices = priceFutures
                .stream()
                .map(CompletableFuture::join)
                .map(price -> " price is " + price)
                .collect(Collectors.toList());
        return prices;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    方案2:

    public List<String> findPricesInUSD2(String product) {
        List<CompletableFuture<String>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            CompletableFuture<String> futurePriceInUSD = CompletableFuture
                            .supplyAsync(() -> shop.getPrice(product), executor)
                            .thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD), executor), (price, rate) -> price * rate)
                            .thenApply(price -> shop.getName() + " price is " + price);
            priceFutures.add(futurePriceInUSD);
        }
        
        List<String> prices = priceFutures
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        return prices;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    方案3:

    public List<String> findPricesInUSD3(String product) {
        List<CompletableFuture<String>> priceFutures = shops
            .stream()
            .map(shop -> CompletableFuture
                .supplyAsync(() -> shop.getPrice(product))
                .thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate)
                .thenApply(price -> shop.getName() + " price is " + price))
            .collect(Collectors.toList());
    
        List<String> prices = priceFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        return prices;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    方案4:使用Java 7中提供的特性完成上述功能

    // 为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,尝试仅使用Java 7中提供的特性实现以下如上实现
    public List<String> findPricesInUSDJava7(String product) {
        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<Double>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            final Future<Double> futureRate = executor.submit(new Callable<Double>() {
                public Double call() {
                    return ExchangeService.getRate(Money.EUR, Money.USD);
                }
            });
            Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
                public Double call() {
                    try {
                        double priceInEUR = shop.getPrice(product);
                        return priceInEUR * futureRate.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e.getMessage(), e);
                    }
                }
            });
            priceFutures.add(futurePriceInUSD);
        }
        List<String> prices = new ArrayList<>();
        for (Future<Double> priceFuture : priceFutures) {
            try {
                prices.add(" price is " + priceFuture.get());
            }
            catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        return prices;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    我们能看到创建流水线对同步和异步操作进行混合操作有多么简单,随着处理任务和需要合并结果数目的增加,这种声明式程序设计的优势也愈发明显。

    测试

    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch("性能比较");
        execute("combined USD findPricesInUSDJava7", () -> bestPriceFinder.findPricesInUSDJava7("myPhone27S"), stopWatch);
        execute("combined USD CompletableFuture", () -> bestPriceFinder.findPricesInUSD("myPhone27S"), stopWatch);
        execute("combined USD CompletableFuture v2", () -> bestPriceFinder.findPricesInUSD2("myPhone27S"), stopWatch);
        execute("combined USD CompletableFuture v3", () -> bestPriceFinder.findPricesInUSD3("myPhone27S"), stopWatch);
        StopWatchUtils.logStopWatch(stopWatch);
    }
    private static void execute(String msg, Supplier<List<String>> s, StopWatch stopWatch) {
        stopWatch.start(msg);
        System.out.println(s.get());
        stopWatch.stop();
        System.out.println();
    }
    
    
    [ price is 91.040141702717,  price is 125.1710573102377,  price is 158.16078708139523,  price is 136.45612204497712,  price is 130.0591978746988]
    
    [ price is 145.62246618545896,  price is 123.78887748261509,  price is 142.17561724598045,  price is 147.48700495707948,  price is 148.2901027867818]
    
    [BestPrice price is 126.3823279607243, LetsSaveBig price is 124.52723804111048, MyFavoriteShop price is 129.1051274535831, BuyItAll price is 114.36072566615553, ShopEasy price is 121.11783256695436]
    
    [BestPrice price is 168.06251816668828, LetsSaveBig price is 148.38498827435606, MyFavoriteShop price is 119.0272869408407, BuyItAll price is 115.15446874021768, ShopEasy price is 153.35355439427738]
    
    性能比较 total cost time = 6045 ms
    combined USD findPricesInUSDJava7        : 1008 ms, 16.67%
    combined USD CompletableFuture           : 2009 ms, 33.23%
    combined USD CompletableFuture v2        : 2015 ms, 33.33%
    combined USD CompletableFuture v3        : 1012 ms, 16.74%
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    SD NAND 为什么可以完胜T卡(tf卡/sd卡)?
    【实践篇】Redis最强Java客户端(四)之Ression分布式集合使用指南
    易基因|疾病研究:DNA甲基化与转录组分析联合揭示吸烟免疫相关疾病的表观遗传机制
    【高速PCB电路设计】8.DDR模块设计实战
    Java的动态代理Proxy.newProxyInstance
    c语言进阶部分详解(详细解析自定义类型——结构体,内存对齐,位段)
    LeetCode 438. 找到字符串中所有字母异位词
    硬件基础 -- D/A数字模拟信号
    OceanBase 4.3 特性解析:列存技术
    车联网行业报告及摘要
  • 原文地址:https://blog.csdn.net/weixin_37646636/article/details/134397193