• FutureTask的测试使用和方法执行分析


    FutureTask类图如下
    在这里插入图片描述

    java.util.concurrent.FutureTask#run run方法执行逻辑如下

     public void run() {
            if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    state说明:

    • NEW:新任务的初始状态;
    • COMPLETING:当任务被设置结果时,处于COMPLETING状态,是一个中间状态;
    • NORMAL:表示任务正常结束;
    • EXCEPTIONAL:表示任务因异常而结束;
    • CANCELLED:任务还未执行完成之前就调用了cancel方法,任务处于CANCELLED;
    • INTERRUPTING:当任务调用cancel(true)中断程序时,任务处于INTERRUPTING状态,是一个中间状态;
    • INTERRUPTED:任务调用cancel(true)中断程序时会调用interrupt()方法中断线程运行,任务状态由INTERRUPTING转变为INTERRUPTED;

    代码逻辑说明:

    • 如果不是新建状态或者已经在运行中,则直接返回
    • 否则执行如下:
    1. 取FutureTask内部的Callable,不是空且是新建状态就去执行
    2. 执行成功失败记录
      • 成功,记录结果,状态最后为:NORMAL
      • 失败,记录异常, 状态最后为:EXCEPTIONAL
    3. 最后会并完成任务,会执行java.util.concurrent.FutureTask#finishCompletion(最后内部的Callable会设置为null)
      private void finishCompletion() {
    // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
    
        done();
    
        callable = null;        // to reduce footprint
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    唤醒下一个线程,并从等待队列中移除掉。


    get超时方法

     public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
     /**
         * Awaits completion or aborts on interrupt or timeout.
         *
         * @param timed true if use timed waits
         * @param nanos time to wait, if timed
         * @return state upon completion or at timeout
         */
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // The code below is very delicate, to achieve these goals:
            // - call nanoTime exactly once for each call to park
            // - if nanos <= 0L, return promptly without allocation or nanoTime
            // - if nanos == Long.MIN_VALUE, don't underflow
            // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
            //   and we suffer a spurious wakeup, we will do no worse than
            //   to park-spin for a while
            long startTime = 0L;    // Special value 0L means not yet parked
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING)
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    Thread.yield();
                else if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                else if (q == null) {
                    if (timed && nanos <= 0L)
                        return s;
                    q = new WaitNode();
                }
                else if (!queued)
                    queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
                else if (timed) {
                    final long parkNanos;
                    if (startTime == 0L) { // first time
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else {
                        long elapsed = System.nanoTime() - startTime;
                        if (elapsed >= nanos) {
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed;
                    }
                    // nanoTime may be slow; recheck before parking
                    if (state < COMPLETING)
                        LockSupport.parkNanos(this, parkNanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 判断如果是运行中,特定时间内就while(true)不断判断完成或异常; 中间有个细节( Thread.yield() : 自己在运行,尝试让出CPU )
    • 超时仍运行(即特定时间判断完后线程还是运行中)则抛出TimeoutException; 否则返回对应结果

    返回结果

    /**
    * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    测试代码:

     public static void main(String[] args) throws InterruptedException {
    
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            FutureTask<Integer> futureTask1 = new FutureTask(()->{
                try{
                    System.out.println("task Run....");
                    TimeUnit.SECONDS.sleep(3);
                }catch (Exception e){
    
                }
                System.out.println("task finish....");
                return 1;
            });
    
            System.out.println(futureTask1);
            executorService.submit(futureTask1);
            TimeUnit.SECONDS.sleep(1);
            futureTask1.cancel(false);
            System.out.println(futureTask1.toString());
    
            Integer a = null;
            try {
                a = futureTask1.get(2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
            System.out.println(futureTask1);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    TS 泛型推断好难啊,看看你能写出来不
    a++和++a
    API 网关的功能
    mysql的max_allowed_packet配置
    前端实现表格生成序号001、002、003自增
    js导出excel表格并生成多sheet(更改列宽)
    外包干了20天,技术退步明显......
    Spring Cloud服务发现与注册的原理与实现
    数字化营销:企业营收N倍增长的秘诀
    计算机毕业设计(附源码)python学生量化考核系统
  • 原文地址:https://blog.csdn.net/qq_26437925/article/details/133842241