• 多线程与高并发——并发编程(8)


    八、异步编程

    1 FutureTask应用&源码分析

    1.1 FutureTask介绍

    • FutureTask 是一个可以取消一部任务的类,FutureTask 对 Future 做了一个基本实现,可以调用方法去开始和取消一个任务,一般是配合 Callable 使用。
    • 异步任务启动之后,可以获取一个绑定当前异步任务的 FutureTask。
    • 可以基于 FutureTask 的方法去取消任务、查看任务是否有结果,以及获取任务的返回结果。
    • FutureTask内部的整体结构中,实现了 RunnableFuture 接口,RunnableFuture 这个接口又继承了 Runnable、Future 这两个接口,所以 FutureTask 也可以作为任务直接交给线程池去处理。

    1.2 FutureTask应用

    大方向是 FutureTask 对任务的控制:

    • 任务执行过程中状态的控制;
    • 任务执行完毕后,返回结果的获取。

    FutureTask 的任务在执行 run 方法后,是无法被再次运行的,需要使用 runAndReset 方法才可以。

    public static void main(String[] args) throws InterruptedException {
        // 构建FutureTask,基于泛型执行返回结果类型
        // 在有参构造中,声明Callable或者Runnable指定任务
        FutureTask futureTask = new FutureTask<>(() -> {
            System.out.println("任务开始执行……");
            Thread.sleep(2000);
            System.out.println("任务执行完毕……");
            return "OK!";
        });
    
        // 构建线程池
        ExecutorService service = Executors.newFixedThreadPool(10);
    
        // 线程池执行任务
        service.execute(futureTask);
    
        // 1. 对任务状态的控制
    //        System.out.println("任务结束了么?:" + futureTask.isDone());
    //        Thread.sleep(1000);
    //        System.out.println("任务结束了么?:" + futureTask.isDone());
    //        Thread.sleep(1000);
    //        System.out.println("任务结束了么?:" + futureTask.isDone());
    
        // 2. 对返回结果的获取,类似阻塞队列的take方法,死等结果
    //        try {
    //            String s = futureTask.get();
    //            System.out.println("任务结果:" + s);
    //        } catch (ExecutionException e) {
    //            e.printStackTrace();
    //        }
        // 3. 对返回结果的获取,类似阻塞队列的poll方法
        // 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
    //        try {
    //            String s = futureTask.get(3000, TimeUnit.MILLISECONDS);
    //            System.out.println("返回结果:" + s);
    //        } catch (Exception e) {
    //            System.out.println("异常返回:" + e.getMessage());
    //            e.printStackTrace();
    //        }
    
        // 4. futureTask提供了run方法,一般不会自己去调用run方法,而是让线程池去执行任务,由线程池去执行run方法
        // run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。
        // 如果希望任务可以反复被执行,需要去调用runAndReset方法
    //        futureTask.run();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    1.3 FutureTask源码分析

    • 查看FutureTask 的源码,要从几个方向去看:
      • 先查看 FutureTask 中提供的一些状态;
      • 再查看任务的执行过程
    1.3.1 FutureTask中的核心属性
    • 首先弄清楚任务的状态流转是怎样的,其次对于核心属性要知道是干嘛的。
    /**
     FutureTask的核心属性
     FutureTask任务的状态流转
     * NEW -> COMPLETING -> NORMAL           任务正常执行,并且返回结果也正常返回
     * NEW -> COMPLETING -> EXCEPTIONAL      任务正常执行,但是结果是异常
     * NEW -> CANCELLED                      任务被取消   
     * NEW -> INTERRUPTING -> INTERRUPTED    任务被中断
     */
    // 记录任务的状态
    private volatile int state;
    // 任务被构建之后的初始状态
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    // 需要执行任务,会被赋值到这个属性
    private Callable callable;
    // 任务的任务结果要存储在这几个属性中
    private Object outcome; // non-volatile, protected by state reads/writes
    // 执行任务的线程
    private volatile Thread runner;
    // 等待返回结果的线程Node对象
    private volatile WaitNode waiters;
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    1.3.2 FutureTask的run方法
    • 任务执行前的一些判读,以及调用任务封装结果的方式,还有最后的一些后续处理。
    // 当线程池执行FutureTask任务时,会调用的方法
    public void run() {
       
        // 如果当前任务状态不是NEW,直接return告辞
        if (state != NEW ||  
            // 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程,如果CAS失败,直接return告辞
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
       
            // 将要执行的任务拿到
            Callable<V> c = callable;
            // 健壮性判断,保证任务不是null,再次判断任务的状态是NEW(DCL)
            if (c != null && state == NEW) {
       
                // 执行任务
                // result:任务的返回结果
                // ran:如果为true,任务正常结束。 如果为false,任务异常结束。
                V result;
                boolean ran;
                try {
       
                    result = c.call();	// 执行任务
                    ran = true;			// 正常结果,ran设置为true
                } catch (Throwable ex) {
       
                    // 如果任务执行期间出了异常,返回结果置为null
                    result = null;
                    ran = false;		// ran设置为false
                    setException(ex);	// 封装异常结果
                }
                if (ran)
                    set(result);		// 封装正常结果
            }
        } finally {
       
            runner = null;		// 将执行任务的线程置为null
            int s = state;		// 拿到任务的状态
            if (s >= INTERRUPTING)	// 如果状态大于等于INTERRUPTING
                // 进来代表任务中断,做一些后续处理
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    1.3.3 FutureTask的set&setException方法
    • 任务执行完毕后,修改任务的状态以及封装任务的结果
    // 没有异常的时候,正常返回结果
    protected void set(V v) {
       
        // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
       
            // 将返回结果赋值给 outcome 属性
            outcome = v;
            // 将任务状态变为NORMAL,正常结束
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            // 一会再说……
            finishCompletion();
        }
    }
    // 任务执行期间出现了异常,这边要封装结果
    protected void setException(Throwable t) {
       
        // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
       
            // 将异常信息封装到 outcome 属性
            outcome = t;
            // 将任务状态变为EXCEPTIONAL,异常结束
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
            // 一会再说……
            finishCompletion();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1.3.4 FutureTask的cancel方法
    • 任务取消的一个方式:
      • 任务直接从 NEW 状态转换为 CANCEL
      • 任务从 NEW 状态变为 INTERRUPTING,然后再转换为 INTERRUPTED
    // 取消任务操作
    public boolean cancel(boolean mayInterruptIfRunning) {
       
        // 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning
        // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
        if (!(state == NEW && 
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
          
            // 如果mayInterruptIfRunning为true,就需要中断线程
            if (mayInterruptIfRunning) {
       
                try {
       
                    Thread t = runner;	// 拿到任务线程
                    if (t != null)		// 如果线程不为null,直接interrupt   
                        t.interrupt();
                } finally {
        
                    // 将任务状态设置为INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
       
            // 任务结束后的一些处理~~ 一会看~~
            finishCompletion();
        }
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    1.3.5 FutureTask的get方法
    • 这个是线程获取 FutureTask 任务执行结果的方法
    // 拿任务结果
    public V get() throws InterruptedException, ExecutionException {
       
        // 获取任务的状态
        int s = state;
        // 要么是NEW,任务还没执行完;要么COMPLETING,任务执行完了,结果还没封装好
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);	// 让当前线程阻塞,等待结果
        return report(s);				// 最终想要获取结果,需要执行report方法
    }
    // 线程等待FutureTask结果的过程
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
       
        // 针对get方法传入了等待时长时,需要计算等到什么时间点
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 声明好需要的Node,queued:放到链表中了么
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
       
            // 查看线程是否中断,如果中断,从等待链表中移除,甩个异常
            if (Thread.interrupted()) {
       
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  • 相关阅读:
    MD文档中插入数学公式,Typora中插入数学公式
    QT高阶-QSS样式表用法大全
    简述 AOP 动态代理
    9.DesignForManufacture\CreateArtwork...
    离散数学---判断矩阵:自反性,反自反性,对称性得到矩阵的自反闭包,对称闭包。
    GLEIF携手TrustAsia,共促数字邮件证书的信任与透明度升级
    【Vue2】VantUI项目入门教程
    【Java数组】一维数组与二维数组(附题目)
    【c++百日刷题计划】 ———— DAY3,带你轻松学习算法
    GIt版本回滚的两种方法reset、revert
  • 原文地址:https://blog.csdn.net/yangwei234/article/details/132819256