今天我们来一起盘一盘应用场景很多的实现类,虽然我们不经常直接使用这些类,但是在各种地方都有它们的身影(比如 线程池 ),且它们很容易搞混,我称之为“ 可知结果的未来任务——FutureTask ” (叫法有点拗口哈,因为我实在想不出更贴切的名字了~ )
由于涉及到的接口定义和实现比较多(看标题就知道了。。), 我先让它们各自做个自我介绍,然后我们最后来看看所谓的“ 可知结果的未来任务 ”的源码实现 。
在之前介绍线程的时候,我们就知道了,创建线程的很多方法,其中实现Runnable接口就是其中的方式,其实就是定义了这个线程run方法中做了哪些事情:
源码中的注释还是比较好理解的,但是我们会发现,这个run方法没有任何返回值,也就是说我们根本就不知道,run方法的执行结果到底怎么样,这时Doug Lea大神就定义了另外一套接口 Callable 系列接口。
要想run方法有结果,最直接的方法就是将返回值类型void改成特定的类,好在Java有泛型编程,如下源码实现:
/** * A task that returns a result and may throw an exception. * Implementors define a single method with no arguments called * {@code call}. * *The {@code Callable} interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * {@code Runnable}, however, does not return a result and cannot * throw a checked exception. * *
The {@link Executors} class contains utility methods to * convert from other common forms to {@code Callable} classes. * * @see Executor * @since 1.5 * @author Doug Lea * @param
the result type of method {@code call} */ @FunctionalInterface public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } 复制代码
我们和Runnable简单比较下:
public class CallableDemo { public static void main(String[] args) throws Exception { Callablecallable = new Callable () { @Override public String call() throws Exception { for (int i = 0; i < 5; i++) { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "处理业务中。。。" + i); } return "122333333"; } }; System.out.println(Thread.currentThread().getName() + "结果:" + callable.call()); } } //运行效果 /** main处理业务中。。。0 main处理业务中。。。1 main处理业务中。。。2 main处理业务中。。。3 main处理业务中。。。4 main结果:122333333 */ 复制代码
用法是不是很简单呢。 特点如下:
这时,大神又定义了另一个接口,它可以把call方法交给另一个线程异步执行,叫做“ 未来 ” 接口 —— Future ,如果有更牛逼的名字,请告知我哈~
官方的解释如下: Future表示异步计算的结果。方法用于检查计算是否完成、等待计算完成以及检索计算结果。 只有在计算完成后,才能使用get方法检索结果,在必要时阻塞直到它准备好。取消是由cancel方法执行的。还提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果为了可取消性而使用Future,但不提供可用的结果,可以声明Future并返回null作为底层任务的结果。
简单来讲如下:
/** * A {@link Future} that is {@link Runnable}. Successful execution of * the {@code run} method causes completion of the {@code Future} * and allows access to its results. 一个可运行的未来。run方法的成功执行将导致Future的完成,并允许访问其结果。 * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @paramThe result type returned by this Future's {@code get} method */ public interface RunnableFuture extends Runnable, Future { /** * Sets this Future to the result of its computation * unless it has been cancelled. 除非已取消,否则将此Future设置为其计算的结果。 */ void run(); } 复制代码
从源码的继承关系来看, 它即继承了Runnable,又继承了Future,这样子的一个既可以创建异步线程来执行异步任务,又可以对异步任务进行操控的接口就这样诞生了。 至于里面的 run方法,一开始我也觉得很懵,为啥Runnable接口里面有了,这边又定义了一次? 结合 下图官方API ,你没看错,它其实就是Runnable的run方法,我猜它 这边又定义了一次,只是用来强调下,这是这个类的特有异步执行可操控未来的一个异步过程(就是强调这是属于该“未来任务”的执行方法,只不过是借用了Runnable,感觉有点像是应用了Adapter适配器设计模式~)
好了,上面的介绍其实都是铺垫,下面的才是本篇的主角,它实现了以上接口的所有特性,它才是真正的 未来任务实现类——FutureTask 。
在此,我们画一个UML类图,来对上面的各个接口定义做如下清晰展示:
下面我们就来看看我们的正主: FutureTask 。
/* * Revision notes: This differs from previous versions of this * class that relied on AbstractQueuedSynchronizer, mainly to * avoid surprising users about retaining interrupt status during * cancellation races. Sync control in the current design relies * on a "state" field updated via CAS to track completion, along * with a simple Treiber stack to hold waiting threads. * * Style note: As usual, we bypass overhead of using * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics. 大概意思如下: 修订注意:这与依赖于AbstractQueuedSynchronizer的该类的以前版本不同,主要是为了避免在取消 竞赛期间保留中断状态让用户感到意外。当前设计中的同步控制依赖于通过CAS更新的“state”字段来 跟踪完成情况,以及一个简单的Treiber堆栈来保存等待的线程。 样式注意:与往常一样,我们绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafe intrinsic。 请注意:本篇基于JDK8源码讲解,这段话的意思其实就是在这版本之前,应该是依赖AQS实现的 现在改成了通过 state+CAS+Treiber堆栈来实现的。有兴趣的小伙伴可以看下JDK8之前的实现。 */ /** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callablecallable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; 复制代码
/** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } 复制代码
果不其然,和我们之前一起学习过的AQS差不多,这里 是一个单向的等待列表(因为有个next属性),内部也是当前线程。
根据以上源码注释和属性,总结: 在当前版本( JDK8,之前的版本实现方式不太一样 )FutureTask的同步控制实现,是通过对状态的CAS的更新,以及用一个线程安全的Treiber堆栈来保存等待的线程的方式来进行的。
FutureTask提供了两个构造器,如下:
/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future> f = new FutureTask (runnable, null)} * @throws NullPointerException if the runnable is null */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } 复制代码
public staticCallable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); } /* 适配器模式来啦,和前面的猜测呼应上了。。。。主要目的就是将Runnable适配成Callable并指定 传过来的结果作为异步任务的结果 */ static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } 复制代码
通过构造器我们知道, 最终就算传入的是Runnable对象,其实最后还是callable对象。
具体接口的实现,就是FutureTask的实现原理了,我们来看看它是如何实现异步任务的控制,并且可以获取到异步任务的结果的吧。主要是Runnable接口和Future接口对应的方法,首先来看 Runnable接口 的实现:
run方法实现
public void run() { //如果state不为NEW 或者将runner修改为当前线程不成功,注意此时的线程是真正执行任务的线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; //以下过程较简单,就不注释了~~ try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts /** private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; */ int s = state; //如果状态大于等于INTERRUPTING,说明在中断线程 if (s >= INTERRUPTING) //见下面方法的分析 handlePossibleCancellationInterrupt(s); } } //set(result) protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } //setException(ex) protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //注意此处将异常结果给了结果 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } //finishCompletion() /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. 其实就是个循环清空等待队列,并且唤醒各个节点线程的方法,也相当于一个结束后的优化清理工作 我叫他动作完成的既定动作。 */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } //handlePossibleCancellationInterrupt private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. //其实是个自旋,只要没有被中断,就不会退出,直到被中断退出 //(中断的过程在cancel方法里可以看到我们下面会分析到) // if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt } 复制代码
过程如下:
简单总结: run方法其实就是将当前线程变成真正的runner,然后不停的判断状态是否正常,正常的话,就去执行call方法具体的业务,执行完成的话,就把结果返回,并且唤醒所有等待的线程;如果有异常,则设置异常;最后再次判断是否有中断,有的话,等待state变为已中断才退出。
我们再看 Future 的实现:其实主要就是看 get和cancel 两个方法就可以了。
get方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } //awaitDone(false, 0L) /** * Awaits completion or aborts on interrupt or timeout. 在中断 或 超时时等待完成 或 中止 * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; /* 待封装的节点(主要用来将当前执行get方法的线程在未来可能封装为等待节点,注意是可能,仔细看 下面的分析) */ WaitNode q = null; //是否排队 boolean queued = false; /** 此处是一个自旋操作,什么时候退出循环呢,其实在方法的注释上已经告诉我们了 1、被中断 2、等待超出设定的时间 3、任务完成被唤醒 */ for (;;) { /** 如果在反复循环过程中线程被中断了,此时有可能线程被放到队列(Treiber栈),则先移除,然后 抛异常;如果没有放入等待队列,则直接抛出异常 */ if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } /** 我把状态值移过来,方便大家分析 private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; */ int s = state; /*继续走到这里,判断状态是否大于COMPLETING(NORMAL/EXCEPTIONAL/CANCELLED/ INTERRUPTING/INTERRUPTED),就是目前处于上面那些状态,要么完成,要么异常了, 要么被取消了等结束状态,则直接返回当前任务的状态即可。 */ if (s > COMPLETING) { if (q != null) //此处的置空,其实是方便GC垃圾回收 q.thread = null; return s; } //如果状态为正在执行任务中,则yield等等,进入下一次的循环 else if (s == COMPLETING) // cannot time out yet Thread.yield(); /** 如果再进来一次循环,代码走到这里(s=COMPLETING,请回头到上面看看run方法的过程就知道了) */ else if (q == null) q = new WaitNode(); /** 再进来,queued=false,如果还没入队,则将包装好的q通过CAS+头插入(查到头节点之前) 的方式入队,然后再次进入下一次的循环 */ else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //如果是超时进来的,则里面的就是等待超时的操作,和之前讲过的AQS超时操作都是一样的 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //再进来循环,走到这里,说明任务还未结束,且当前线程已经包装成节点进入等待队列 //(Treiber栈)了,则阻塞线程 else LockSupport.park(this); } } //report(s) /** * Returns result or throws exception for completed task. * * @param s completed state value */ /** 我把状态值移过来,方便大家分析 private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); //除了上面两种情况外,就是执行抛异常了 throw new ExecutionException((Throwable)x); } 复制代码
get流程总结如下:
cancel方法
//Future接口,cancel方法的注释 /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when {@code cancel} is called, * this task should never run. If the task has already started, * then the {@code mayInterruptIfRunning} parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. 试图取消此任务的执行。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。 如果成功,并且在调用cancel时这个任务还没有启动,那么这个任务就不应该运行。如果任务已经开始, 那么参数mayInterruptIfRunning决定执行该任务的线程是否应该在试图停止该任务时被中断。 * *After this method returns, subsequent calls to {@link #isDone} will * always return {@code true}. Subsequent calls to {@link #isCancelled} * will always return {@code true} if this method returned {@code true}. * 该方法返回后,对isDone的后续调用将始终返回true。如果该方法返回true,则对iscancelled后续调用将 始终返回true。 * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete mayInterruptIfRunning true:当前的任务会被中断 false:允许当前的任务继续执行 * @return {@code false} if the task could not be cancelled, * typically because it has already completed normally; * {@code true} otherwise */ public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { //如果mayInterruptIfRunning为true,则中断线程,并且将状态改为INTERRUPTED try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //最后做完成既定动作,上面分析过 finishCompletion(); } //最后返回true,取消成功 return true; } 复制代码
根据代码和注释分析如下:
以上呢,便是FutureTask主要操作的过程分析了。
最后呢,我再提供一个demo的用法示例,注意看代码中的注释,按照最后的方法试试运行效果:
public class DEmo { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("main start"); FutureTasktask = new FutureTask (() -> { System.out.println("thread:" + Thread.currentThread().getName() + "异步任务开始..."); Thread.sleep(5000); System.out.println("thread:" + Thread.currentThread().getName() + "异步任务结束.."); return 1024; }); Thread t1 = new Thread(task); // 开启FutureTask任务的执行,由t1线程执行 t1.start(); //t2为取消线程 Thread t2 = new Thread(new Runnable() { @Override public void run() { // t2线程 2s后取消,两种形式,中断式取消和非中断式 try { Thread.sleep(2000); } catch (InterruptedException e) { } // 1 task.cancel(false); } }); t2.start(); // 此处主线程等待获取结果 // 2 System.out.println(task.get()); // 上面的demo,我标出了1和2两处位置 // 小伙伴们可以试着切换1处的true/false看看效果 // 也可以将2处的代码进行注释和不注释切换看看效果 } } 复制代码
PS:其实提供这段代码的主要目的是供小伙伴们打断点调试,来印证我文末最后的总结,如下方我在源码中打断点的代码位置( 看代码行数即可 )
以为到这里本篇就结束了吗?还没有,我们再来仔细看下FutureTask的方法,如下:
我们仔细看这三个方法的修饰符,都是protected,这说明什么,说明这三个方法可以在子类中被重写
在此,我们也 引出一个问题: FutureTask虽然为我们提供了获取任务结果的方法get,但是呢,最终还是没有达到我想要的异步效果,即我调用了get方法之后,会阻塞在那,而且发生异常的话,异常只能被动抛出。 我现在想要实现一种我不需要get方法阻塞,也能获取到结果,且以通知的形式告诉主线程,如果任务异常的话,可以主动地获取到异常信息。
来看看怎么实现的吧~
public interface TaskListenser { /** exception:true,结果异常,value即异常详情信息 false,无异常,value即正常结果值 */ public void onResultOk(boolean exception, Object value); } 复制代码
public class MyFutureTaskextends FutureTask { /** * 设置监听 */ private TaskListenser taskListenser; public MyFutureTask(Callable callable) { super(callable); } @Override protected void set(V v) { super.set(v); try { // 任务执行完成,通知 taskListenser.onResultOk(false, get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } @Override protected void setException(Throwable t) { super.setException(t); taskListenser.onResultOk(true, getExceptionResult()); } /** * 把异常信息以返回值的形式给到前台 * * @return */ private Throwable getExceptionResult() { try { get(); } catch (InterruptedException | ExecutionException e) { return e; } return null; } public void setTaskListenser(TaskListenser taskListenser) { this.taskListenser = taskListenser; } } 复制代码
public class MyDemo implements TaskListenser, Callable{ @Override public void onResultOk(boolean exception, Object value) { if (!exception) { System.out.println("我收到結果了:" + (String) value); } else { System.out.println("执行任务发生异常了:" + ((Throwable) value).getMessage()); } } @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + "异步任务开始..."); // 模拟执行任务 for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "执行任务中..." + i); // 此段代码可以用来测试任务异常的情况,注释打开即可 //if (i == 3) { // throw new Exception("发生异常啦!!!"); //} Thread.sleep(1000); } System.out.println(Thread.currentThread().getName() + "异步任务结束.."); return "1024"; } public static void main(String[] args) { MyDemo myDemo = new MyDemo(); MyFutureTask task = new MyFutureTask<>(myDemo); task.setTaskListenser(myDemo); Thread t1 = new Thread(task); // 开启FutureTask任务的执行,由t1线程执行 t1.start(); } } 复制代码
//运行效果 //正常 Thread-0异步任务开始... Thread-0执行任务中...0 Thread-0执行任务中...1 Thread-0执行任务中...2 Thread-0执行任务中...3 Thread-0执行任务中...4 Thread-0异步任务结束.. 我收到結果了:1024 //发生异常的效果 Thread-0异步任务开始... Thread-0执行任务中...0 Thread-0执行任务中...1 Thread-0执行任务中...2 Thread-0执行任务中...3 执行任务异常了:java.lang.Exception: 发生异常啦!!! 复制代码
例子比较简单哈,大家可以试试~ 当然,大家如果能够举一反三的话就更好了!! 比如我这边的通知只是利用了观察者模式,能不能用我们之前学的阻塞队列呢? 切忌眼高手低啊!
本篇到此就结束了,你学会了吗?~ 文字偏多,感谢阅读,认可的话,请点个赞吧~ 哈哈