• Java 19新特性:Structured Concurrency (结构化并发编程)


    结构化并发编程式和虚拟线程息息相关的,要了解虚拟线程的相关知识,请看上一节:Java 19新特性:虚拟线程(Virtual Threads )。在这里我们关于虚拟线程需要记住两件事情:

    • 虚拟线程的创建成本很低,而且比我们在JDK中使用多年的常规平常线程要便宜的多。
    • 阻塞它们的成本也很低。

    自JDK5以来,我们不应该直接与线程交互。正确的模式是将任务作为RunnableCallable提交给ExecutorServiceExecutor,然后对返回的Future进行操作。Loom保留了这种模型,并添加了一些不错的功能。这里要介绍的第一个对象是Scope对象, 确切的类型是StructuredTaskScope

    Scope对象

    我们可以把这个对象看做一个虚拟线程启动器,我们以Callable的形式向它提交任务,我们将得到一个future返回,并且这个callable将在由作用域Scope为我们创建的虚线程种执行。这很像Executor。但二者之间也有很大的区别。

    下面我们将通过一个简单案例,来学习一下Scope该如何使用:假设我们现在要做一个旅游网站,现在首页需要我们查询一个天气预报服务器,获取当前的天气信息。我们先来看一下普通的代码:

    public class Main {
        public static void main(String[] args) throws Exception {
    
            Instant begin = Instant.now();
    
            Weather weather = Weather.readWeather();
    
            Instant end = Instant.now();
    
            System.out.println("weather = " + weather);
            System.out.println("Time is = " + Duration.between(begin,end).toMillis() + " ms");
        }
    }
    /*
    weather = Weather[agency=agencyC, weather=Sunny]
    Time is = 695 ms
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    public record Weather(String agency, String weather) {
        // 模拟不同的天气网站
        private final static String[] AGENCY_LIST = {"agencyA","agencyB","agencyC"};
    
        public static Weather readWeather() throws Exception {
            return readWeatherFrom();
        }
    
        /**
         * 模拟请求天气服务,返回结果。
         * @return Weather
         */
        private static Weather readWeatherFrom() throws Exception {
            Random random = new Random();
            // 模拟延时
            Thread.sleep(Duration.of(random.nextInt(10, 1200), ChronoUnit.MILLIS));
            return new Weather(AGENCY_LIST[random.nextInt(0, 3)], "Sunny");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    下面让我们使用Scope

    		//  修改readWeather
        public static Weather readWeather() throws Exception {
            try(var scope = new StructuredTaskScope<Weather>()) {
                Future<Weather> future = scope.fork(Weather::readWeatherFrom);
                scope.join();
                return future.resultNow();
            }
        }
    /*
    WARNING: Using incubator modules: jdk.incubator.concurrent
    weather = Weather[agency=agencyC, weather=Sunny]
    Time is = 430 ms
    */
    // 运行时添加 : --add-modules jdk.incubator.concurrent参数
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    StructuredTaskScope实例是AutoCloseable的,我们可以使用try-with-resource模式。通过fork()方法fork一个Callable类型的任务,fork()方法返回一个Future对象,我们调用join()方法阻塞调用,它将阻塞当前线程,直到所有提交(frok)给StructuredTaskScope的任务都完成。最后调用FutureresultNow()获取结果并返回。resultNow()将抛出异常,如果我们在Future完成前调用它,所以我们要在join()方法调用。

    到这里你可能会说:“就这?” 不要着急,继续向下看!!

    Scope对象和ExecutorService之间的区别

    1、在程序启动时创建Executor,当应用程序关闭时,关闭它们。ExecutorService与应用程序具有相同的生命周期。这是我们应该使用ExecutorService的方式,因为ExecutorService持有平台线程,而平台线程的创建成本很高。Scope只是虚拟线程的启动器。我们不需要共享虚拟线程,因为虚拟线程很便宜。所以一旦我们完成了一个Scope,我们就可以关闭它,并垃圾回收它,这都是没问题的。

    2、Executor持有单个队列,所有的任务被添加到这个队列中,这个ExecutorService中的不同线程,当它们有机会时,将一次一次的接收这些任务。而Scope时构建在ForkJoinPool之上的,因此每个线程都有自己的等待队列,如果一个线程没有做任何事情,它可能会从另一个队列窃取任务执行,这种模式就是之前提到的工作窃取模式,它是由JDK中的ForkJoinPool实现的。

    更进一步

    假设我们现在要查询多个天气预报服务器,一旦有一个返回结果,我们就可以将其他的查询取消了。下面我们会用到一个特殊的Scope,它是StructuredTaskScope类的扩展StructuredTaskScope.ShutdownOnSuccess,当然还有一个ShutdownOnFailure

        public static Weather readWeather() throws Exception {
            try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()) {
    
                Future<Weather> futureA = scope.fork(Weather::readWeatherFrom);
                Future<Weather> futureB = scope.fork(Weather::readWeatherFrom);
                Future<Weather> futureC = scope.fork(Weather::readWeatherFrom);
    
                scope.join();
    
                System.out.println("futureA = " + futureA.state());
                System.out.println("futureB = " + futureB.state());
                System.out.println("futureC = " + futureC.state());
              
                // 注意这里是通过scope获取结果返回的。
                return scope.result();
            }
        }
    
    /*
    futureA = SUCCESS
    futureB = FAILED
    futureC = FAILED
    weather = Weather[agency=agencyC, weather=Sunny]
    Time is = 187 ms
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    通过结果我们看到,futureA成功其他两个是FAILED状态,这意味着他们已经被scope中断。

    任务失败出现异常,会发生什么?

    这取决于不同的Scope,对于ShutdownOnSuccess,将不会选择此任务来产生结果。但是如果所有任务都失败了,那么我们将得到一个ExecutionException,其中来自第一个Future的异常作为根异常。

    创建自己的业务Scope

    我们不能扩展ShutdownOnSuccess,因为它是final类,但是我们仍然可以包装它,组合它。但是我们可以扩展StructuredTaskScope。假设现在我们要查询旅行社的报价,而不是天气预报,我们需要查询几个不同的报价服务器。以获取最好的价格。我们想通过scope.bestQuotation()获取最便宜的价格,显然,JDK内置的Scope是没有这个方法的。
    在这里插入图片描述
    下面我们来编写自己的Scope:

    import jdk.incubator.concurrent.StructuredTaskScope;
    import java.time.Duration;
    import java.time.temporal.ChronoUnit;
    import java.util.Collection;
    import java.util.Comparator;
    import java.util.Random;
    import java.util.concurrent.ConcurrentLinkedDeque;
    import java.util.concurrent.Future;
    
    public record Quotation(String agency, int price) {
        private static final Random random = new Random();
    
    
        public static class QuotationException extends RuntimeException {
    
        }
    
        /** 自定义的Scope */
        public static class QuotationScope extends StructuredTaskScope<Quotation> {
    
            private final Collection<Quotation> quotations = new ConcurrentLinkedDeque<>();
            private final Collection<Throwable> exceptions = new ConcurrentLinkedDeque<>();
    
            /** 当一个任务完成时,会调用此方法 */
            @Override
            protected void handleComplete(Future<Quotation> future) {
                switch (future.state()){
                    case RUNNING -> throw new IllegalStateException(" Future is still running... ");
                    case SUCCESS -> this.quotations.add(future.resultNow());
                    case FAILED -> this.exceptions.add(future.exceptionNow());
                    case CANCELLED -> {}
                }
            }
    
            /** 获取所有异常 */
            public QuotationException exceptions() {
                QuotationException quotationException = new QuotationException();
                this.exceptions.forEach(quotationException::addSuppressed);
                return quotationException;
            }
    
    
    
            /** 从队列中拿出最小的 */
            public Quotation bestQuotation() {
                return quotations.stream()
                        .min(Comparator.comparing(Quotation::price))
                        .orElseThrow(this::exceptions);
            }
    
        }
    
        public static Quotation readQuotation() throws Exception {
            try (var scope = new QuotationScope()){
                scope.fork(Quotation::readQuotationFromA);
                scope.fork(Quotation::readQuotationFromB);
                scope.fork(Quotation::readQuotationFromC);
    
                scope.join();
    
                Quotation quotation = scope.bestQuotation();
                return quotation;
            }
        }
    
    
        /**  以下模拟请求不同报价服务器 */
        public static Quotation readQuotationFromA() throws Exception {
            Thread.sleep(Duration.of(random.nextInt(10, 1200), ChronoUnit.MILLIS));
            return new Quotation("agencyA", random.nextInt(100, 1000));
        }
        public static Quotation readQuotationFromB() throws Exception {
            Thread.sleep(Duration.of(random.nextInt(10, 1200), ChronoUnit.MILLIS));
            return new Quotation("agencyB", random.nextInt(100, 1000));
        }
        public static Quotation readQuotationFromC() throws Exception {
            Thread.sleep(Duration.of(random.nextInt(10, 1200), ChronoUnit.MILLIS));
            return new Quotation("agencyC", random.nextInt(100, 1000));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    public class Main {
        public static void main(String[] args) throws Exception {
    
            Instant begin = Instant.now();
    
            Quotation quotation = Quotation.readQuotation();
    
            Instant end = Instant.now();
    
            System.out.println("quotation = " + quotation);
            System.out.println("Time is = " + Duration.between(begin,end).toMillis() + " ms");
        }
    }
    /*
    quotation = Quotation[agency=agencyB, price=520]
    Time is = 759 ms
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    我们可以看到,与经典的带有回调的一部代码相比,这段代码本身是完全异步的。

        public static Quotation readQuotation() throws Exception {
            try (var scope = new QuotationScope()){
                scope.fork(Quotation::readQuotationFromA);
                scope.fork(Quotation::readQuotationFromB);
                scope.fork(Quotation::readQuotationFromC);
    
                scope.join();
    
                Quotation quotation = scope.bestQuotation();
                return quotation;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    每次fork都在自己的线程中执行,但是使用的模式是完全同步的。这个Scope对象的好处是我们可以用同步的方式写代码。

    将报价和天气预报整个到一个页面

    我们创建一个TravelPage记录,在里面放上报价和天气预报。

    public class Main {
        public static void main(String[] args) throws Exception {
    
            Instant begin = Instant.now();
    
            TravelPage travelPage = TravelPage.readTravelPage();
    
            Instant end = Instant.now();
            
            System.out.println("Travel page = " + travelPage);
            System.out.println("Time is = " + Duration.between(begin,end).toMillis() + " ms");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    新创建了一个PageComponent密封接口,让它们三个均实现它。QuotationWeather实现这个接口就行,代码不用动。

    public sealed interface PageComponent permits Quotation, TravelPage, Weather {
    }
    
    • 1
    • 2
    • TravelPage
    import jdk.incubator.concurrent.StructuredTaskScope;
    
    import java.util.Objects;
    import java.util.concurrent.Future;
    
    public record TravelPage(Quotation quotation, Weather weather) implements PageComponent {
    
        public static class TravelPageScope extends StructuredTaskScope<PageComponent>{
    
            private volatile Weather weather;
            private volatile Quotation quotation;
    
            private volatile Quotation.QuotationException quotationException;
            private volatile Throwable exception;
    
            @Override
            protected void handleComplete(Future<PageComponent> future) {
                switch (future.state()) {
                    case RUNNING -> throw new IllegalStateException(" Future is still running... ");
                    // 这里的future可能写到Weather或者Quotation
                    case SUCCESS -> {
                        switch (future.resultNow()){
                            case Weather weather -> this.weather = weather;
                            case Quotation quotation -> this.quotation = quotation;
                            default -> throw new IllegalStateException("Unexpected value: " + future.resultNow());
                        }
                    }
                    case FAILED -> {
                        switch (future.exceptionNow()) {
                            case Quotation.QuotationException quotationException -> this.quotationException = quotationException;
                            default -> this.exception = future.exceptionNow();
                        }
                    }
                    case CANCELLED -> {}
                }
            }
    
            public TravelPage travelPage() {
    
                if (this.quotation == null) {
                     if (this.quotationException != null){
                         throw new RuntimeException(this.quotationException);
                     } else {
                         throw new RuntimeException(this.exception);
                     }
                } else {
                    return new TravelPage(
                            this.quotation,
                            Objects.requireNonNullElse(
                                    this.weather,
                                    new Weather("Unknown","Mostly sunny")
                                    )
                            );
                }
            }
        }
    
        public static TravelPage readTravelPage() throws Exception {
            try (var scope = new TravelPageScope()){
                // 这里 readWeather和readQuotation分别启动了自己的scope
                scope.fork(Weather::readWeather);
                scope.fork(Quotation::readQuotation);
    
                scope.join();
    
                return scope.travelPage();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 查看运行结果
    /*
    futureA = SUCCESS
    futureB = FAILED
    futureC = FAILED
    Travel page = TravelPage[quotation=Quotation[agency=agencyB, price=431], weather=Weather[agency=agencyB, weather=Sunny]]
    Time is = 992 ms
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    测试超时的情况

    修改Weather

        public static Weather readWeather() throws Exception {
            try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()) {
    
                Future<Weather> futureA = scope.fork(Weather::readWeatherFrom);
                Future<Weather> futureB = scope.fork(Weather::readWeatherFrom);
                Future<Weather> futureC = scope.fork(Weather::readWeatherFrom);
    
                // 设置等待的时间
                scope.joinUntil(Instant.now().plusMillis(10));
    
                System.out.println("futureA = " + futureA.state());
                System.out.println("futureB = " + futureB.state());
                System.out.println("futureC = " + futureC.state());
    
                return scope.result();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 运行结果
    /*
    Travel page = TravelPage[quotation=Quotation[agency=agencyC, price=650], weather=Weather[agency=Unknown, weather=Mostly sunny]]
    Time is = 823 ms
    */
    
    • 1
    • 2
    • 3
    • 4

    我们看到,此时返回的天气是我们兜底的天气信息,而且Weather中的状态输出也没有打印。

  • 相关阅读:
    SQL注入漏洞及五大手法
    5g双频千兆无线路由器工业级
    亚马逊云代码AI助手CodeWhisperer使用教程
    孙哥Netty视频笔记总结
    jeecg 重新加载表格
    C#程序采用AOT发布,真的可以避免被反编译?
    postgresql安装配置和基本操作
    WSL VSCode运行C++项目
    c++学习【23】matlab实现FOC算法
    【ARM Cache 系列文章 7.1 – ARMv8/v9 MMU 页表配置详细介绍 02 】
  • 原文地址:https://blog.csdn.net/XiumingLee/article/details/126803317