• Java多线程专题之Callable、Future与FutureTask(含源码分析)


     

    前言

    大家好,一直以来我都本着用最通俗的话理解核心的知识点, 我认为所有的难点都离不开 基础知识 的铺垫。目前正在出一个 Java多线程专题 长期系列教程,从入门到进阶, 篇幅会较多, 喜欢的话,给个关注:heart: ~

    适合人群

    • 有一定的Java基础
    • 想学习或了解多线程开发
    • 想提高自己的同学

    大佬可以绕过 ~

    背景

    之前给大家讲了一些 框架 的使用,这些都属于业务层面的东西,你需要熟练掌握它并在项目中会运用它即可,但这些对自身技术的积累是远远不够的,如果你想要提高自己,对于语言本身你需要花更多的时间去挖掘而不是局限于框架的使用,所以之前为什么跟大家一直强调 基础 的重要性,框架可以千变万化,层出不穷,但是基础它是不变的,不管是学 java 还是 前端 或者是其它语言, 这一点大家还是需要认清的。

    接下来的几期会专门讲 多线程 这一块,篇幅会较多,耐心看完你一定会有 收获 ~

    情景回顾

    上期带大家学习了什么是进阶学习了 Thread 以及分析了它的一些源码,本期带大家学习 Callable、Future与FutureTask 的用法以及 源码分析 , 内容较多, 我们一起来看一下吧~

    Callable & Future

    之前我们通过 Runnable,Thread 就可以创建一个线程,但是它也有一个局限,就是没有返回值,有时候我们的需求需要结合多任务处理后的数据做一些事情,所以通过上边的方法就不好解决了。

    下面我们看一下 Callable

    public interface Callable {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    复制代码

    首先它是一个接口,且还提供了泛型的支持, call 方法有返回值, 那怎么使用它呢,肯定是要实现它

    public class CallableTest {
    
        public static class CallableDemo implements Callable {
    
            @Override
            public String call() throws Exception {
                return "hello";
            }
        }
    
    
        public static void main(String[] args) throws Exception {
            CallableDemo demo = new CallableDemo();
            String result = demo.call();
            System.out.println(result);
            System.out.println("main");
        }
    }
    复制代码

    运行一下实际输出

    hello
    main
    复制代码

    发现返回的结果输出出去了,但是这里有个问题,这个main输出在hello之后,似乎好像没有开启一个线程,依然是同步执行的,是这样吗,我们看一下 call 内部的线程环境

    public String call() throws Exception {
        System.out.println(Thread.currentThread());
        Thread.sleep(3000);
        return "hello";
    }
    复制代码

    运行一下实际输出

    Thread[main,5,main]
    hello
    main
    复制代码

    好家伙,还是main线程内部,并且线程还被阻塞了,原来 new 是开启不了线程的,只是单纯的实现了一下它的接口,我们姿势搞错了。其实它的源码上加了注释的,说通常会借助 Excutors 类使用,这个类是用来创建 线程池 的,这个我们后边讲,这里给大家演示一下

    public static void main(String[] args) throws Exception {
        CallableDemo demo = new CallableDemo();
        // 创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        // 提交任务
        Future future = executor.submit(demo);
        System.out.println("main");
    }
    复制代码

    实际输出:

    main
    Thread[pool-1-thread-1,5,main]
    复制代码

    发现是单独线程执行的,并且没有阻塞线程。我们发现这里也用到了 Future ,这个翻译过来时未来的意思,这里也就是结果发生在后边,它是一个 异步 情况, 那么我们如何获取到结果呢?

    System.out.println(future.get());
            
    System.out.println("main");
    
    复制代码

    实际输出:

    Thread[pool-1-thread-1,5,main]
    hello
    main
    复制代码

    发现结果拿到了,但是运行的时候好像线程被阻塞了,我们可以发现 get() 会导致线程阻塞, 举一反三 ,我想不阻塞的情况下拿到返回值,可以吗:question:那有什么办法呢?开启单独的线程不就好了,那么在单独的线程可以拿到其它线程的值吗,我们来试一下

    new Thread(() -> {
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }).start();
    System.out.println("main");
    
    复制代码

    实际运行输出:

    Thread[pool-1-thread-1,5,main]
    main
    hello
    
    复制代码

    发现,这下就对了~

    Future & FutureTask 源码解析

    端起小板凳,这部分好好听,我们主要看下它的 源码实现 。我们上文使用到了 Future ,我们看一下它的定义,发现它也是一个接口

    public interface Future {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException;
    }
    复制代码

    还有一个接口叫做 RunnableFuture , FutureTask 是它的一个实现类,这个类帮我实现了很多好用的方法,因为我们自己实现的话是很麻烦的

    public interface RunnableFuture extends Runnable, Future {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    复制代码

    之前的例子也可以用 FutureTask 改写成:

    public static void main(String[] args) throws Exception {
        CallableDemo demo = new CallableDemo();
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask futureTask = new FutureTask<>(demo);
        executor.submit(futureTask);
        System.out.println(futureTask.get());
    }
    复制代码

    它继承了 Runnable, Future 接口,我们之前调用的 get 方法就是其中之一,来一起看一下这个 get 是如何拿到值的,该部分源码来自 FutureTask 类实现

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    复制代码

    这个 state 线程的状态值,这里很好理解,一个是阻塞方法 awaitDone ,一个是抛出结果 report ,我们重点看一下 awaitDone 的实现:

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                        q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    
    复制代码

    首先它是一个内部方法, timed 指定是否定时等待,如果传 true 的话需要指定时间 nanos

    // 销亡时间  System.nanoTime() 正在运行的 Java 虚拟机的高分辨率时间源的当前值,以纳秒为单位
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    复制代码

    WaitNode q = null; 它是一个链表结构 volatile 被用来修饰会被不同线程访问和修改的变量, 后边还会讲到,此处先有个印象

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    复制代码

    for (;;) {...} ,这是一个死循环,这里就是阻塞部分了,内部先会判断线程状态

    // 判断线程状态 如果中断,直接抛出异常,并且将```q```从节点中移除
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }
    复制代码

    这里为什么会移除呢,想想看,如果不移除,内部积累太多,每次都要遍历它,如果是有竞争的情况下,是不是很浪费。这里主要是避免不必要的高额开销

    // 线程状态 最先是 NEW
    int s = state;
    if (s > COMPLETING) {
        // 如果线程完成状态 移除q节点 并返回当前线程状态 最终通过report返回结果
        if (q != null)
            q.thread = null;
        return s;
    }
    复制代码

    这里为什么移除?因为完成了,我只要结果就好了,不需要在进一步判断了

    else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
    复制代码

    如果处于 COMPLETING ,会让出cpu时间

    else if (q == null)
                    q = new WaitNode();
    复制代码

    这个很好理解,节点不存在就创建一个

    else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
    复制代码

    如果有新任务进来,会新建一个节点,然后利用CAS操作放入waiter链表的头部,这里是一个原子性操作, CAS 的概念我们后边给大家讲,这里一切都是为了安全

    compareAndSwap 是个原子方法,原理是CAS,即将内存中的值与期望值进行比较,如果相等,就将内存中的值修改成新值并返回true。

    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this);
    复制代码

    这里判断消亡时间,如果超时了,移除节点,并返回线程状态, LockSupport 使线程阻塞,有的同学可能会问, for 不是已经阻塞了吗:question:那为啥还调用 LockSupport ,这里其实是线程优化,想想你一直for循环一直判断是不是也会产生开销,加上 LockSupport 避免不要的操作,其实 for 的整个过程是实现了 自旋锁 的操作。

    阻塞了不就没法执行了吗, park 加锁方法还有一个对应的 unpark 相当于释放  ,但此处没有看到这个方法,那么它在哪个地方呢:question:我们大体应该可以猜到,它应该是在执行阶段,还记得 RunnableFuture 接口下的run方法吗?下面我们看一下它的实现

    public void run() {
            // 判断线程状态 如果不为NEW 或者 并判断值是否一样,如果不一样就直接返回
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                // 这一步是执行我们的任务
                Callable c = callable;
                // 如果任务存在 并且处于NEW状态
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        // 执行任务
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        // 异常检测
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    // 执行成功,设置返回值
                    if (ran)
                        set(result);
                }
            } finally {
                // 这里其实是释放阶段 防止并发调用
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                // 这一步其实是防止在中断时提交任务,内部是调用了一个Thread.yield()
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    复制代码

    下面我们重点看一下这个 set 方法

    protected void set(V v) {
            // 先比较是否相同
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // outcome 是返回的结果或者异常 setException这里是设置异常结果 异常赋值给outcome
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置最终状态
                finishCompletion();
            }
    }
    复制代码

    UNSAFE 类是一个很特殊的类,它的内部几乎都是 native 方法,它可以使得我们能够操作 内存空间 来获得更高的性能,但一般我们很少使用它,因为它不被 gc 控制,使用不当 jvm 可能都会挂了。我们重点关注一下 finishCompletion 这个方法

    private void finishCompletion() {
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    // 遍历节点释放锁
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            // 默认下它是一个空方法,可以用于执行完成的回调方法, 可以覆盖实现
            done();
    
            callable = null;        // to reduce footprint
        }
    
    复制代码

    我们可以看到在这个内部它是调了一个 unpark 方法的,可以看出之前 awaitDone() 方法内部的线程阻塞在这个地方被 唤醒 了, 再回回过头看 awaitDone() 方法,就明白为啥要调用 park 方法了,因为线程没有达到大于 COMPLETING 状态,它会一直 for

    最后一个就是 report 了,返回值

    private V report(int s) throws ExecutionException {
            Object x = outcome;
            // 在set的时候 我们可以看到有设置为这个状态。 V就是传入的类型
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    复制代码

    于是我们的 get 就拿到返回值了

    FutureTask 状态

    这里给大家补充一下FutureTask的状态值

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    复制代码

    state可能的状态转变路径如下:

    • NEW -> COMPLETING -> NORMAL
    • NEW -> COMPLETING -> EXCEPTIONAL
    • NEW -> CANCELLED
    • NEW -> INTERRUPTING -> INTERRUPTED

    结束语

    本期到这里就结束了, 总结一下,本节主要讲了 Callable、Future与FutureTask 的常用方法,以及从问题触发,带大家分析了一下 FutureTask 的源码,这里大家要好好理解,不要去背,想要告诉大家的是学习要带着问题, 看源码一定要大胆猜测,冷静分析 

  • 相关阅读:
    jenkins+ssh+Putty构建windows的IIS服务发布
    第八章《Java高级语法》第9节:静态导入技术
    Javaweb书城项目的路径问题
    芯片设计:一颗芯片到底是如何诞生的(上)
    python列表
    【文件读取/包含】任意文件读取漏洞 afr_2
    初识ROS
    SI,SIS,SIR,SEIRD模型
    使用python备份博客数据并自动备份至百度网盘
    Python学习第七篇:sys标准库
  • 原文地址:https://blog.csdn.net/m0_73257876/article/details/126540648