FUture:
futuretask.get()——没有结果的时候,让出cpu,阻塞住,等待唤醒;
futuretask.run()——拿到结果的时候,唤醒阻塞的线程
Callable的call方法可以有返回值,可以声明抛出异常;而runnable不可以。和 Callable配合的有一个Future类,通过Future可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable做不到的,Callable 的功能要比Runnable强大。
问题:Callable 实例能否和 Runnable 实例一样,作为 Thread 线程实例的 target 来使用吗?答 案是不行:Thread 的 target 属性的类型为 Runnable,而 Callable 接口与Runnable 接口之间没有任 何的继承关系,并且二者唯一方法在的名字上也不同。显而易见,Callable 接口实例没有办法作为 Thread 线程实例的 target 来使用。既然如此,那么该如何使用 Callable 接口去创建线程呢?一个 重要的在 Callable 接口与 Thread 线程之间起到搭桥作用的接口,马上就要登场了。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture 继承 Runnable 接口,从而保证了其实例可以作为 Thread线程实例的 target 目标;同时,RunnableFuture 通过继承 Future 接口,从而保证了通过它可以获 取未来的异步执行结果。
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("通过Runnable方式执行任务");
}
}).start();
// 需要借助FutureTask
FutureTask task = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
System.out.println("通过Callable方式执行任务");
Thread.sleep(3000);
return "返回任务结果";
}
});
new Thread(task).start();
Runnable一般表示要执行的任务的过程, 而Future则表述执行任务的结果 (或者说是任务的一个句柄, 可获取结果, 取消任务等)。
(1)能够取消异步执行中的任务。
(2)判断异步任务是否执行完成。
(3)获取异步任务完成后的执行结果。
public interface Future<V> {
// 取消任务的执行,参数表示是否立即中断任务执行,或者等任务结束
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
// 等待任务执行结束,返回泛型结果.中断或任务执行异常都会抛出异常
V get() throws InterruptedException, ExecutionException;
// 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask 类实现 了 RunnableFuture 接口。
Future归根结底只是一个接口,而FutureTask实现了这个接口,同时还实现了Runnalbe接口;
可以阻塞式的获取处理结果,非阻塞式获取任务处理状态;
总结:FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
成员变量
private volatile int state;
//整合了callable——将其包装到run方法中,利用起可以获得结果的功能
private Callable<V> callable;
/** The result to return or exception to throw from get() */
//task的执行结果
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
构造方法
//注意FutureTask其实就是runnable,将callable包装进去
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
任务的取消和完成
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
//任务完成之后什么也没做
protected void done() { }
//任务取消是通过打断线程来实现
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
任务的运行
因为futuretask是作为runner提交给创建的线程池的,所以线程池submit的时候,某个线程就会执行run方法,就会调用这里的run方法
public void run() {
// 任务已经被执行,直接退出
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
// run里面调用的是本类成员变量callable的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 记录异常
setException(ex);
}
if (ran)
//将结果丢给成员变量outcome & 唤醒主现场future.get()
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
任务结果的获取
//无限阻塞等待
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
//有限时间阻塞等待
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
使用Callable和FutureTask创建线程的具体步骤
基本步骤:
(1)创建一个 Callable 接口的实现类,并实现其 call()方法,编写好异步执行的具体逻辑, 并且可以有返回值。
(2)使用 Callable 实现类的实例,构造一个 FutureTask 实例。
(3)使用 FutureTask 实例,作为 Thread 构造器的 target 入参,构造新的 Thread 线程实例;
(4)调用 Thread 实例的 start 方法启动新线程,启动新线程的 run()方法并发执行。其内部的 执行过程为:启动 Thread 实例的 run()方法并发执行后,会执行 FutureTask 实例的 run()方法,最 终会并发执 Callable 实现类的 call()方法。
(5)调用 FutureTask 对象的 get()方法,阻塞性的获得并发线程的执行结果。
FutureTask 的 Callable 成员的 call()方法执行完成后,会将结果保存在 FutureTask 内部的 outcome 实例属性中。以上演示实例的 Callable 实现类中,这里 call()方法中业务逻辑的返回结果,是 "外卖到了!!"这句话。
"外卖到了!!"这句话被返回之后,作为结果将被保存在 FutureTask 内部的 outcome 实例属性中,至此, 异步的“returnableThread”线程执行完毕。在“main”线程处理完自己的事情(以上实例中是一 个消磨时间的循环)后,通过 futureTask 的 get 实例方法获取异步执行的结果。这里有两种情况:
(1)futureTask 的结果 outcome 不为空,callable.call()执行完成;在这种情况下,futureTast.get 会直接取回 outcome 结果,返回给“main”线程(结果获取线程)。
(2)futureTask 的结果 outcome 为空,callable.call()还没有执行完。
在这种情况下,“main”线程作为结果获取线程会被阻塞住,一直被阻塞到 callable.call()执行 完成。当执行完后,最终结果保存到 outcome 中,futureTask 会唤醒的“main”线程,去提取callable.call()执行结果。
FutureTask的缺点
通过 FutureTask 的 get 方法获取异步结果时,主线程也会被阻塞的。是异步阻塞模式。异步阻塞的效率往往是比较低的,被阻塞的主线程,不能干任何事情,唯一能干的,就是在 傻傻等待。原生 Java API,除了阻塞模式的获取结果外,并没有实现非阻塞的异步结果获取方法。
参考链接
这个虽然是获取最先执行完成的task结果,但是如果结果队列里没有元素,依然会阻塞住主线程。
当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,可能拿到的Future并不是第一个执行完成的Callable的Future,就会进行阻塞,从而不能获取到第一个完成的Callable结果,那么这样就造成了很严重的性能损耗问题。
CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
run方法执行任务——执行完成——结果放到队列&唤醒主线程处的future.task().get()方法;
对比线程池ExecutorService直接submit——future.get():
public static void main(String[] args) throws InterruptedException, ExecutionException {
Random random = new Random();
ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<String>> resultFuture = new ArrayList<>();
for(int i = 0; i<4; i++) {
final int tmp = i;
Future<String> future = pool.submit(() -> {
Thread.sleep(1000+10*tmp);
System.out.println(Thread.currentThread().getName()+"|完成任务");
return "data"+random.nextInt(10);
});
resultFuture.add(future);
}
System.out.println("--------------");
for(Future<String> future:resultFuture) {
String result = future.get();
System.out.println("执行结果"+result);
}
}
总结
成员变量
private final Executor executor;//自定义传入的线程池;实质上是提交任务还是交由了线程池来执行
private final AbstractExecutorService aes;
//当线程池中的一个线程把task计算完了,就会放入到这个已经完成的 执行结果future队列
//主线程就是通过completionQueue.task()来阻塞获取已经完成了的future
//ps:我们知道提交的任务执行的结果就是在FutureTask.outcome存在的,所以拿到FutureTask就能获取到结果。
//我们知道BlockingQueue.take()和FutureTask.get()都会阻塞。但是这里要强调的使用这两行在这里只有第一行BlockingQueue.take()会阻塞,FutureTask.get()不会阻塞。
private final BlockingQueue<Future<V>> completionQueue;
构造方法
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
//就是传入了一个Executor和给初始化无界阻塞队列,以便于后续能够存储线程执行的结果FutureTask。
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
主线程提交task
submit()方法最终会委托给内部的executor去执行任务
//讲task包装成了FutureTask:目的自然是返回他本身future
//这里真正调用线程池的execute(task)时候又将FutureTask包装成了QueueingFuture(也是futuretask的继承类)
//目的:因为futuretask的get()方法会阻塞,因此这里弄个继承类包装,让QueueingFuture可以将计算完成后的future丢到完成队列completionQueue里面
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
内部类
//它是ExecutorCompletionService的内部类,重写了FuturTask的done()方法将FutureTask放入到BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
从队列获取完成的future
//阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,
//如果没有就会阻塞,直到有任务完成返回结果。
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
//从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
坑一
使用它异步提交,在收集结果会导致乱序
项目中你有可能使用CompletionService批量(分页 每页是一个任务)去查询数据库(order排序了等),然后在汇总那结果add到一个list中。
此时用这种方式,由于不确定那个任务先返回,就add到了list中,会导致数据库中每页排好序的结果,由于汇总不按照找顺序汇总就乱序了。
解决方式有两种:
一种是再排序
一种是上面我们效率第一点的例子
坑二
在Spring项目中你的CompletionService的构建应当是方到跟使用线程的方法内部new出来,而不应该放到Controller中作为成员变量存在
CompletionService如果作为一个成员变量,
来一个A用户请求方法,方法执行完结果放入BlockingQueue中执行的结果被另外一个用户获取到了,A用户的方法执行到取结果的地方。
此时来了B用户请求方法,方法也执行到了取结果的地方,此时B可能取到A执行的结果,A也有可能B执行的结果,就是因为CompletionService成了共享对象,其内部成员BlockingQueue也成了共享对象。
不管CompletableFuture()执行过程中报错、正常完成、还是取消,都会被标示为已完成,所以最后CompletableFuture.isDown()为true。
ForkJoin线程池,这个公共线程池中的所有线程都是Daemon线程,意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。