FutureTask是一个可取消的异步运算的任务,FutureTask里面可以可以传入Callable和Runable实现类作为参数,可以对异步运算任务的结果进行等待获取,判断是否已经完成,取消任务等操作。理解FutureTask之前先要知道Future接口,
Future接口属于包java.util.concurrent, 接口提供了判断任务是否完成、终端任务、获取任务执行结果三种能力,源码如下:
- package java.util.concurrent;
- public interface Future
{ -
- //用来取消任务
- boolean cancel(boolean mayInterruptIfRunning);
-
- //返回任务是否取消成功
- boolean isCancelled();
-
- // 返回任务是否执行完成
- boolean isDone();
-
- //用来获取执行结果,获取的时候产生阻塞,直到任务执行完成
- V get() throws InterruptedException, ExecutionException;
-
- //用来获取任务执行结果,获取的时候产生阻塞,指定时间内任务没有执行完则TimeoutException
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
-
- }
Future只是一个接口,我们经常使用的是java.util.concurrent.FutureTask, 它是Future接口的实现类 (FutureTask首先实现了RunnableFuture接口,RunnableFuture接口集成Future.并在Future接口的基础上增加了run()行为)
因为接口Future#get()是阻塞方法,所以我们在使用FutureTask#get()获取线程处理结果的时,先会阻塞等待任务处理完成, 处理完成后才能拿到结果
- public class FutureTask
implements RunnableFuture { - //...
- }
- public interface RunnableFuture
extends Runnable, Future { - /**
- * Sets this Future to the result of its computation
- * unless it has been cancelled.
- */
- void run();
- }
一个Future对象可以调用Callable和Runable的对象进行包装,由于FutureTask也是Runnable接口的实现类,所以FutureTask也可以放入线程池中,比如查看Spring源码中的ThreadPoolTaskExecutor 对象
- public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
- implements SchedulingTaskExecutor, Executor, BeanNameAware, InitializingBean, DisposableBean {
-
- public void execute(Runnable task) {
- Executor executor = getThreadPoolExecutor();
- try {
- executor.execute(task);
- }
- catch (RejectedExecutionException ex) {
- throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
- }
- }
-
-
- public Future> submit(Runnable task) {
- FutureTask
- execute(future); //future 为参数
- return future;
- }
-
- public
Future submit(Callable task) { - FutureTask
future = new FutureTask(task); - execute(future); //future 为参数
- return future;
- }
-
-
-
- }
在项目中FutrueTask 广泛使用, 比如有一个任务处理起来比较慢,但是这个任务可以拆分成多个小任务分别处理,然后将结果合并起来返回,
举个例子:计算1*10+2*10+3*10+4*10+5*10求和,可以一次计算出来,也可以拆分成多个线程计算N*10, 每个线程处理后,在最后求和。如下代码所示
- package com.thread.future;
-
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.*;
-
- public class FutureTaskTest {
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService exec = Executors.newFixedThreadPool(5);
- List
> taskList = new ArrayList>(); - for (int i = 1; i < 6; i++) {
- FutureTask
task = new FutureTask(new Calculate(i)); - exec.execute(task);
- taskList.add(task);
- }
-
- Integer sum = 0;
- for (FutureTask
task : taskList) { - //阻塞等待,当线程所有任务执行后才能达到每个线程处理结果
- Integer num = task.get();
- //输出处理结果
- System.out.println("" + new Date() + " " + sum + "+" + num + "=" + (sum + num));
- sum = sum + num;
- }
- exec.shutdown();
- }
-
-
- }
-
- class Calculate implements Callable {
- private int a;
-
- public Calculate(Integer a) {
- this.a = a;
- }
-
- public Integer call() throws Exception {
- System.out.println("" + new Date() + " " + Thread.currentThread().getName() + "deal :" + a);
- Thread.sleep(2000);
- return a * 10; //这里只模拟简单运算
- }
- }
执行结果
- Fri Jul 29 19:17:03 CST 2022 pool-1-thread-5deal :5
- Fri Jul 29 19:17:03 CST 2022 pool-1-thread-2deal :2
- Fri Jul 29 19:17:03 CST 2022 pool-1-thread-1deal :1
- Fri Jul 29 19:17:03 CST 2022 pool-1-thread-4deal :4
- Fri Jul 29 19:17:03 CST 2022 pool-1-thread-3deal :3
- Fri Jul 29 19:17:05 CST 2022 0+10=10
- Fri Jul 29 19:17:05 CST 2022 10+20=30
- Fri Jul 29 19:17:05 CST 2022 30+30=60
- Fri Jul 29 19:17:05 CST 2022 60+40=100
- Fri Jul 29 19:17:05 CST 2022 100+50=150
FutureTask中 state用volatile修饰的,如果在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,保证了state字段的可见性
- public class FutureTask
implements RunnableFuture { - // 表示当前任务的状态,volatile修饰
- private volatile int state;
-
- // 表示当前任务的状态是新创建的,尚未执行
- private static final int NEW = 0;
- // 表示当前任务即将结束,还未完全结束,值还未写,一种临界状态
- private static final int COMPLETING = 1;
- // 表示当前任务正常结束
- private static final int NORMAL = 2;
- // 表示当前任务执行过程中出现了异常,内部封装的callable.call()向上抛出异常了
- private static final int EXCEPTIONAL = 3;
- // 表示当前任务被取消
- private static final int CANCELLED = 4;
- // 表示当前任务中断中
- private static final int INTERRUPTING = 5;
- // 表示当前任务已中断
- private static final int INTERRUPTED = 6;
-
-
- }
构造器:
FuturnTask提供了两个构造器,FutureTask里面可以可以传入Callable和Runable实现类作为参数
- /
- //上文说FutureTask里面可以可以传入Callable和Runable实现类作为参数
-
-
- //一旦运行,执行给定的Callable
- public FutureTask(Callable
callable) { - if (callable == null)
- throw new NullPointerException();
- this.callable = callable;
- // 设置状态为新创建
- this.state = NEW;
- }
-
- //一旦运行,执行给定的Runalbe,并在完成后通过get返回给调用者
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- // 设置状态为新创建
- this.state = NEW;
- }
-
run方法:
- public void run() {
- // 当前任务状态不为new或者runner旧值不为null,说明已经启动过了,直接返回,这里也说明了run()里面的具体逻辑只会
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- // 只有当任务状态为new并且runner旧值为null才会执行到这里
- try {
- // 传入的callable任务
- Callable
c = callable; -
- // 当任务不为null并且当前任务状态为新建时才会往下执行
- if (c != null && state == NEW) {
- V result; // 储存任务的返回结果
- boolean ran;// 储存执行是否成功
- try {
- // 调用callable.run()并返回结果
- result = c.call();
- ran = true; // 正常执行设置ran为true
- } catch (Throwable ex) {
-
- result = null; // 异常时设置结果为null
- ran = false;
-
- //并且更新任务状态为EXCEPTIONAL(执行过程中出现了异常)并且唤醒阻塞的线程
- setException(ex);
- }
-
- if (ran) //执行成功
- // 内部设置outcome为callable执行的结果,并且更新任务的状态为NORMAL(任务正常执行)并且唤醒阻塞的线程
- set(result);
- }
- } finally {
- // 将当前任务的线程设置为null
- runner = null;
- // 当前任务的状态
- int s = state;
- // 如果state>=INTERRUPTING,说明当前任务处于中断中或已中断状态
- if (s >= INTERRUPTING)
- // 如果当前任务处于中,则执行这个方法线程会不断让出cpu直到任务处于已中断状态
- handlePossibleCancellationInterrupt(s);
- }
- }