• 09 多线程与高并发 - CompletableFuture 源码解析


    CompletableFuture 介绍

    CompletableFuture 在一定程度上就提供了各种异步非阻塞的处理方案,并且提供响应式编程,代码编写上效果更佳(更漂亮)

    CompletableFuture 使用详解

    CompletableFuture 源码分析

    从 runAsync() 与 thenRun() 来进行分析

    CompletableFuture<Void> future = CompletableFuture.runAsync(()->System.out.println("runAsync end..."));
    
    future.thenRun(()->System.out.println("thenRun end..."));
    
    • 1
    • 2
    • 3

    runAsync()

    public static CompletableFuture<Void> runAsync(Runnable runnable) {
            return asyncRunStage(asyncPool, runnable);
        }
    
    • 1
    • 2
    • 3
    asyncRunStage()
    static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
            if (f == null) throw new NullPointerException();
            CompletableFuture<Void> d = new CompletableFuture<Void>();
            e.execute(new AsyncRun(d, f));
            return d;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    new AsyncRun()
    static final class AsyncRun extends ForkJoinTask<Void>
                implements Runnable, AsynchronousCompletionTask {
            CompletableFuture<Void> dep; Runnable fn;
            AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
                this.dep = dep; this.fn = fn;
            }
    
            public final Void getRawResult() { return null; }
            public final void setRawResult(Void v) {}
            public final boolean exec() { run(); return true; }
    
            public void run() {
    			// d: 存放前面传过来的 CompletableFuture
    			// f: 存放前面传过来的 任务
                CompletableFuture<Void> d; Runnable f;
    			// 非空校验
                if ((d = dep) != null && (f = fn) != null) {
    				// 加速 gc
                    dep = null; fn = null;
    				// 任务未执行
                    if (d.result == null) {
                        try {
    						// 执行异步任务
                            f.run();
    						// 没有返回结果的将结果对象封装成 NIL,后续会有非空判断
                            d.completeNull();
                        } catch (Throwable ex) {
    						// 出现异常将异常封装到返回结果
                            d.completeThrowable(ex);
                        }
                    }
    				// 执行后续任务
                    d.postComplete();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    postComplete() - 执行后续任务
    final void postComplete() {
            // f: 当前任务的 CompletableFuture
    		// h: 栈顶
            CompletableFuture<?> f = this; Completion h;
    		// 拿到栈顶数据,每次循环 h 指针向后移
            while ((h = f.stack) != null ||
                   (f != this && (h = (f = this).stack) != null)) {
                CompletableFuture<?> d; Completion t;
    			// 栈顶数据换成下一个
                if (f.casStack(h, t = h.next)) {
                    if (t != null) {
                        if (f != this) {
                            pushStack(h);
                            continue;
                        }
                        h.next = null;    // detach
                    }
    				// 执行栈顶的任务
                    f = (d = h.tryFire(NESTED)) == null ? this : d;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    thenRun()

    public CompletableFuture<Void> thenRun(Runnable action) {
            return uniRunStage(null, action);
        }
    
    • 1
    • 2
    • 3
    uniRunStage() - 追加任务到栈结构
    // 追加任务到栈结构的逻辑 
    // e:线程池、执行器。 如果是Async异步调用,会传递使用的线程池。 如果是普通的thenRun,不会传递线程池,为null
    private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
            if (f == null) throw new NullPointerException();
    		// 当前任务的CompletableFuture
            CompletableFuture<Void> d = new CompletableFuture<Void>();
    		// e != null,代表异步执行,走if里面逻辑
    		// e == null, 代表同步执行,先执行 uniRun(),前继任务没有执行完毕返回false就会开始压栈
    		// 同步为啥uniRun 第三个参数传 null ? 与 tryFire() 中传参逻辑不符啊
            if (e != null || !d.uniRun(this, f, null)) {
    			// 将线程池,当前任务CompletableFuture,前继任务,当前具体任务封装
                UniRun<T> c = new UniRun<T>(e, d, this, f);
    			// 将 c 压入 stack 中
                push(c);
    			// 尝试执行当前后继任务
                c.tryFire(SYNC);
            }
            return d;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    uniRun() - 尝试执行任务
    // 尝试执行任务
    // a:前继任务 
    // f:后续具体任务 
    // c:现在是null
    final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
            Object r; Throwable x;
            // 如果前继任务没有执行完返回false
            if (a == null || (r = a.result) == null || f == null)
                return false;
    		// 开始执行当前任务
            if (result == null) {
            	// 前继任务是异常结束就不执行后续任务了
                if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
                    completeThrowable(x, r);
                else
                // 前继任务正常结束
                    try {
                    	// 如果c == null,代表异步执行 
                    	// 如果c != null,代表嵌套执行或同步执行
                        if (c != null && !c.claim())
                        // 异步执行返回 false
                            return false;
                        // 同步执行
                        f.run();
                        // 封装返回结果
                        completeNull();
                    } catch (Throwable ex) {
                    	// 封装异常返回结果
                        completeThrowable(ex);
                    }
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    claim() - 执行任务

    同步执行返回true,异步执行返回false

    final boolean claim() {
                Executor e = executor;
                // 判断当前任务标记,是否执行
                if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                    if (e == null)
                    // 线程池为null,代表同步执行,直接返回true
                        return true;
                    // 异步执行,使用线程池执行即可
                    executor = null; // disable
                    e.execute(this);
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    push() - 压栈
    // 压栈方法 
    final void push(UniCompletion<?,?> c) { 
    // 不为null!!! 
    if (c != null) { 
    // result是前继任务的结果 
    // 只有前继任务还没有执行完毕时,才能将当前的UniRun对象压到栈结构中 
    while (result == null && !tryPushStack(c)) lazySetNext(c, null); 
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    tryFire() - 尝试执行后续任务
    // 入参 mode 值
    // 同步
    static final int SYNC   =  0;
    // 异步
    static final int ASYNC  =  1;
    // 嵌套
    static final int NESTED = -1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    
    // 前面代码
    // 将线程池,当前任务CompletableFuture,前继任务,当前具体任务封装
    UniRun<T> c = new UniRun<T>(e, d, this, f);
    
    
    static final class UniRun<T> extends UniCompletion<T,Void> {
            Runnable fn;
            UniRun(Executor executor, CompletableFuture<Void> dep,
                   CompletableFuture<T> src, Runnable fn) {
                super(executor, dep, src); this.fn = fn;
            }
    		// 尝试执行任务
            final CompletableFuture<Void> tryFire(int mode) {
    			// d: 当前任务CompletableFuture
    			// a: 前继任务
                CompletableFuture<Void> d; CompletableFuture<T> a;
                if ((d = dep) == null ||
    				// 尝试执行后续任务
                    !d.uniRun(a = src, fn, mode > 0 ? null : this))
                    return null;
                dep = null; src = null; fn = null;
                return d.postFire(a, mode);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    整体执行流程

    在这里插入图片描述
    后续任务触发方式有两种:

    1. 前继任务执行完毕,执行 postComplete() 方法
    2. 后续任务压栈之前和之后会尝试执行后续任务,前继任务执行结束的快,后续任务就可以直接执行,不需要前继任务来触发执行
  • 相关阅读:
    数据通信——应用层(文件传输FTP)
    Redis(02)| 数据结构-SDS
    102-视频与网络应用篇-环境搭建
    记录一个在写项目中遇到的Maven依赖无法导入的问题
    十年架构五年生活-03作为技术组长的困扰
    mysql约束——foreign key(外键)和check
    R语言glm函数使用频数数据构建二分类logistic回归模型,分析的输入数据为频数数据、将频数数据转化为正常样本数据(拆分、裂变为每个频数对应的样本个数)
    浅析Kubernetes架构之workqueue
    文件包含漏洞
    基础篇——配置文件解析
  • 原文地址:https://blog.csdn.net/qq_33512765/article/details/126427491