• JUC系列(七) ForkJion任务拆分与异步回调


    📣 📣 📣 📢📢📢
    ☀️☀️你好啊!小伙伴,我是小冷。是一个兴趣驱动自学练习两年半的的Java工程师。
    📒 一位十分喜欢将知识分享出来的Java博主⭐️⭐️⭐️,擅长使用Java技术开发web项目和工具
    📒 文章内容丰富:覆盖大部分java必学技术栈,前端,计算机基础,容器等方面的文章
    📒 如果你也对Java感兴趣,关注小冷吧,一起探索Java技术的生态与进步,一起讨论Java技术的使用与学习
    ✏️高质量技术专栏专栏链接: 微服务数据结构netty单点登录SSMSpringCloudAlibaba
    😝公众号😝想全栈的小冷,分享一些技术上的文章,以及解决问题的经验
    当前专栏JUC系列

    ForkJion

    什么是ForkJoin

    ForkJoin 下 JDK 1.7 并行执行任务的,数量越大,效率越高

    比如 :大数据 Map Reduce(把大任务拆分成小任务)

    image-20220304004113183

    ForkJoin 特点: 工作窃取

    举例子:

    PS: 维护的是双端队列 Deuue

    A线程执行任务到 第二个

    B线程执行完毕,那么B线程回去讲A线程的东西拿来执行,从而提高效率

    image-20220304004235249

    认识forkjion

    ForkJoin 使用两个类来完成以上两件事情:

    • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
      • RecursiveAction:用于没有返回结果的任务。
      • RecursiveTask :用于有返回结果的任务。
    • ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

    image-20220304005022113

    代码实例

    task 类 里面编写的是我们继承了 递归任务继承的实现方法

    public class forkjoinDemo extends RecursiveTask<Long> {
        /* 解决方案 也是有三六九等的,比如案例 求和
         * 最低等 就是直接for循环求和
         * 中等 使用forkjion
         * 高等 stream 并行流
         * */
    //开始
        private long start;
        //结束
        private long end;
        //到多少值,才开始分开任务
        private long threshold = 10000L;
    
        public forkjoinDemo(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            //判断超过阈值的时候 开始使用 fork join
            if (end - start > threshold) {
                long sum = 0L;
                for (long i = start; i <= end; i++) {
                    sum += i;
                }
                return sum;
            } else {
                //    求出中间值
                long mid = (start - end) / 2;
                forkjoinDemo task1 = new forkjoinDemo(start, mid);
    
                //拆分任务,把任务压入线程队列
                task1.fork();
                forkjoinDemo task2 = new forkjoinDemo(mid + 1, end);
                task2.fork();
                return task1.join() + task2.join();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    测试类

    三种方法的速度

    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //test1(); 7042;
            //test2(); 969
            //test3(); 179;
        }
    
        public static void test1() {
            Long sum = 0L;
            long start = System.currentTimeMillis();
            for (long i = 1L; i <= 10_0000_0000; i++) {
                sum += i;
            }
            long end = System.currentTimeMillis();
            System.out.println("sum" + sum + "=> 执行时间" + (end - start));
        }
    
    
        public static void test2() throws ExecutionException, InterruptedException {
            long start = System.currentTimeMillis();
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Long> task = new forkjoinDemo(0L, 10_0000_0000L);
            ForkJoinTask<Long> submit = forkJoinPool.submit(task);
            Long sum = submit.get();
            long end = System.currentTimeMillis();
            System.out.println("sum" + sum + "=> 执行时间" + (end - start));
        }
    
        public static void test3() {
    
            long start = System.currentTimeMillis();
            //并行流
            long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
            long end = System.currentTimeMillis();
            System.out.println("sum" + reduce + "=> 执行时间" + (end - start));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    异步回调

    什么是future

    常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

    这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

    从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

    Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

    image-20220304014056949

    上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

    Future接口的局限性

    当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,注意我加粗的地方,Future的get() 方法会阻塞主线程。即使我们使用isDone()方法轮询去查看线程执行状态,但是这样也非常浪费cpu资源。

    image-20220304014316398

    我们需要新的,更强大的拓展,CompletableFuture

    在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

    CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

    通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

    实例化:

    有两种格式,一种是supply开头的方法,一种是run开头的方法

    • supply开头:这种方法,可以返回异步线程执行之后的结果
    • run开头:这种不会返回结果,就只是执行线程任务
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
    
    public static CompletableFuture<Void> runAsync(Runnable runnable);
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    获取结果

    同步获取结果

    public T    get()
    public T    get(long timeout, TimeUnit unit)
    public T    getNow(T valueIfAbsent)
    public T    join()
    
    • 1
    • 2
    • 3
    • 4

    简单的例子

    CompletableFuture<Integer> future = new CompletableFuture<>();
    Integer integer = future.get();
    
    • 1
    • 2

    get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。有兴趣的小伙伴可以打个断点看看,状态会一直是not completed

    代码使用案例

     public static void main(String[] args) throws ExecutionException, InterruptedException {
            没有返回值的异步回调, runAsync
            //CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
            //    System.out.println(Thread.currentThread().getName() + "runAsync=> Void");
            //});
            //System.out.println("1111");
            获取执行结果
            //completableFuture.get();
         
         
            //    有返回值的
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "runAsync=>integer");
                int i = 10 / 0;
                return 1024;
            });
            completableFuture.whenComplete((t, u) -> {
                //t是正常的返回结果
                //u是返回报错信息
                System.out.println("t=>" + t);
                System.out.println("u=>" + u);
            }).exceptionally((e) -> {
                System.out.println(e.getMessage());
                return 233;
            }).get();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

  • 相关阅读:
    002数据安全传输-多端协议传输平台:配置Oracle数据库-19c及导入数据信息
    Linux下的第一个小程序--进度条 & 蹦迪炫彩进图条
    “内鬼”作祟,国内知名游戏公司被黑
    参考意义大。4+巨噬细胞相关生信思路,简单易复现。
    【搭建OpenCV+Tesseract】
    Java Math.toRadians()具有什么功能呢?
    Javaweb filter过滤器 跟 listener监听器
    传智教育|git实战技巧-本地刚做出的修改、暂存和提交如何进行撤销
    速看|期待已久的2022年广州助理检测工程师真题解析终于出炉
    2023腾讯云标准型S5云服务器简单测评,比较值!
  • 原文地址:https://blog.csdn.net/doomwatcher/article/details/128153096