• FutureTask配合Thread实现处理有返回结果的源码、逻辑与架构分析


    1.介绍

    FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况。

    2.使用示例

    // 创建任务对象
    FutureTask<Integer> task = new FutureTask<>(() -> {
        log.debug("running");
        Thread.sleep(1000);
        return 200;
    });
    
    new Thread(task).start();
    
    // 主线程阻塞,同步等待 task 执行完毕的结果
    Integer value = task.get();
    
    System.out.println("value = " + value);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3.执行过程描述

    1. FutureTask 类在实例化构造时需要传入一个实现了 Callable 接口的类,实现 Callable 接口需要重写 call 方法,该方法需要一个返回值,由于 Callable 定义时是以泛型定义返回值,因此我们可以自定义返回值。FutureTask 会将传入的这个 Callable 实现类赋给自己的属性 private Callable callable;
    2. FutureTask 间接实现了 Runnable 接口,并重写了 run 方法,重写的 run 方法中会调用到 属性 callable 的 call 方法,并将 call 方法返回值存储到自己的属性 private Object outcome;
    3. Thread 类在实例化构造时可以传入一个 Runnable 接口的类,由于 FutureTask 实现了 Runnable 接口,因此我们可以直接将 FutureTask 对象作为构造器实参赋给 Thread对象的属性 private Runnable target;
    4. Thread 对象调用 start 方法,最终会调用到自身就重写了的 run 方法,自身重写的 run 方法中又会调用到 target 的 run 方法,即 FutureTask 自身已经重写的 run 方法,这时候就可以回到“第 2 点讲解”,了解到 FutureTask 的 run 方法中所做的事情。
    5. FutureTask 对象的 get() 方法,是去获取 callable 的 call 方法返回值,即属性 outcome 的值。get 方法中会调用 awaitDone 方法,awaitDone 方法中会使用 for (;;) 造成当前线程阻塞,直到 call 方法执行结束可以获取到 outcome 的值,并将 outcome 作为 get() 方法返回值。

    4.整体的关系

    Thread 和 FutureTask 类均实现了 Runnable 接口并重写了其 run 方法,Thread 将 FutureTask 进行聚合赋给 private Runnable target

    5.涉及到的核心源码(只提取了关键代码)

    5.1 Callable

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    5.2 RunnableFuture

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5.3 FutureTask

    public class FutureTask<V> implements RunnableFuture<V> {
    
        /** The underlying callable; nulled out after running */
        private Callable<V> callable;
    
        // 存储 callable 接口的 call 方法的返回值
        /** The result to return or exception to throw from get() */
        private Object outcome; // non-volatile, protected by state reads/writes
    
        /*
        	() -> {
                log.debug("running");
                Thread.sleep(1000);
                return 200;
            }
            这实际上是对函数式接口 callable 的 V call() 方法进行实现
        */
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }   
    
        public void run() {
            // ...
    		
            Callable<V> c = callable;
            // 重写了 Runnable 函数式接口的 run 方法
            result = c.call();
    
            // ...
    
            // 赋值
            set(result);
    
            // ...
        }
    
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 将 callable 的 call 方法返回值,即我们自定义的 200 赋给 outcome
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    
    
        // 获取 callable 的 call 方法的返回结果
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                // 获取到结果成功的标识,实际是在 awaitDone 方法中用了死循环不断判断是否生成返回结果,造成了线程阻塞
                s = awaitDone(false, 0L);
            // 获取结果
            return report(s);
        }
    
        // timed-是否计时等待,即是否设置等待超时,false表示不设置,true表示设置
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            // 死循环
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    5.4 Thread

    public class Thread implements Runnable {
    
        /* What will be run. */
        private Runnable target;
    
        // 构造器,将间接实现了 Runnable 接口的 FutureTask 对象传进来
        public Thread(Runnable target) {
            init(null, target, "Thread-" + nextThreadNum(), 0);
        }
    
        private void init(ThreadGroup g, Runnable target, String name, long stackSize) {
            init(g, target, name, stackSize, null, true);
        }
        
        private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize, AccessControlContext acc,
                          boolean inheritThreadLocals) {
            
            // ...
            
            // 将 FutureTask 对象赋给 Thread 对象的属性 target
            this.target = target;
    
        }
    
        @Override
        public void run() {
            if (target != null) {
                // 实际调用的 FutureTask 对象重写的 run 方法,重写的 run 方法中又会调用 callable 接口的 call 方法,并将 call 方法的返回值赋给 FutureTask 对象的属性 outcome
                target.run();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    OnlyOffice文档服务器安装及集成使用
    23 距离判别
    指针进阶(3)
    Python函数绘图与高等代数互融实例(三):设置X|Y轴文本标签|网格线
    冲量在线出席隐私计算Meet-Up活动,探讨如何利用可信执行环境(TEE)和软硬件结合的方案使隐私计算市场迈向大规模生产
    文件管理:文件的逻辑结构
    LeetCode HOT 100 —— 49.全排列
    MySQL - 索引的数据结构
    深度学习入门(7)误差反向传播计算方式及简单计算层的实现
    【Kotlin学习路线】讲解
  • 原文地址:https://blog.csdn.net/qq_62982856/article/details/133954819