大家好,一直以来我都本着用最通俗的话理解核心的知识点, 我认为所有的难点都离不开 基础知识 的铺垫。目前正在出一个 Java多线程专题
长期系列教程,从入门到进阶, 篇幅会较多, 喜欢的话,给个关注:heart: ~
大佬可以绕过 ~
之前给大家讲了一些 框架
的使用,这些都属于业务层面的东西,你需要熟练掌握它并在项目中会运用它即可,但这些对自身技术的积累是远远不够的,如果你想要提高自己,对于语言本身你需要花更多的时间去挖掘而不是局限于框架的使用,所以之前为什么跟大家一直强调 基础
的重要性,框架可以千变万化,层出不穷,但是基础它是不变的,不管是学 java
还是 前端
或者是其它语言, 这一点大家还是需要认清的。
接下来的几期会专门讲 多线程
这一块,篇幅会较多,耐心看完你一定会有 收获
上期带大家学习了什么是进阶学习了 Thread
以及分析了它的一些源码,本期带大家学习 Callable、Future与FutureTask
的用法以及 源码分析
, 内容较多, 我们一起来看一下吧~
之前我们通过 Runnable,Thread
下面我们看一下 Callable
public interface Callable{ /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } 复制代码
首先它是一个接口,且还提供了泛型的支持, call
方法有返回值, 那怎么使用它呢,肯定是要实现它
public class CallableTest { public static class CallableDemo implements Callable{ @Override public String call() throws Exception { return "hello"; } } public static void main(String[] args) throws Exception { CallableDemo demo = new CallableDemo(); String result = demo.call(); System.out.println(result); System.out.println("main"); } } 复制代码
hello main 复制代码
发现返回的结果输出出去了,但是这里有个问题,这个main输出在hello之后,似乎好像没有开启一个线程,依然是同步执行的,是这样吗,我们看一下 call
public String call() throws Exception { System.out.println(Thread.currentThread()); Thread.sleep(3000); return "hello"; } 复制代码
Thread[main,5,main] hello main 复制代码
好家伙,还是main线程内部,并且线程还被阻塞了,原来 new
是开启不了线程的,只是单纯的实现了一下它的接口,我们姿势搞错了。其实它的源码上加了注释的,说通常会借助 Excutors
类使用,这个类是用来创建 线程池
public static void main(String[] args) throws Exception { CallableDemo demo = new CallableDemo(); // 创建线程池 ExecutorService executor = Executors.newCachedThreadPool(); // 提交任务 Futurefuture = executor.submit(demo); System.out.println("main"); } 复制代码
main Thread[pool-1-thread-1,5,main] 复制代码
发现是单独线程执行的,并且没有阻塞线程。我们发现这里也用到了 Future
,这个翻译过来时未来的意思,这里也就是结果发生在后边,它是一个 异步
情况, 那么我们如何获取到结果呢?
System.out.println(future.get()); System.out.println("main"); 复制代码
Thread[pool-1-thread-1,5,main] hello main 复制代码
发现结果拿到了,但是运行的时候好像线程被阻塞了,我们可以发现 get()
会导致线程阻塞, 举一反三
new Thread(() -> { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }).start(); System.out.println("main"); 复制代码
Thread[pool-1-thread-1,5,main] main hello 复制代码
端起小板凳,这部分好好听,我们主要看下它的 源码实现
。我们上文使用到了 Future
public interface Future{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 复制代码
还有一个接口叫做 RunnableFuture
, FutureTask
public interface RunnableFutureextends Runnable, Future { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); } 复制代码
之前的例子也可以用 FutureTask
public static void main(String[] args) throws Exception { CallableDemo demo = new CallableDemo(); ExecutorService executor = Executors.newCachedThreadPool(); FutureTaskfutureTask = new FutureTask<>(demo); executor.submit(futureTask); System.out.println(futureTask.get()); } 复制代码
它继承了 Runnable, Future
接口,我们之前调用的 get
方法就是其中之一,来一起看一下这个 get
是如何拿到值的,该部分源码来自 FutureTask
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } 复制代码
这个 state
线程的状态值,这里很好理解,一个是阻塞方法 awaitDone
,一个是抛出结果 report
,我们重点看一下 awaitDone
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } 复制代码
首先它是一个内部方法, timed
指定是否定时等待,如果传 true
的话需要指定时间 nanos
// 销亡时间 System.nanoTime() 正在运行的 Java 虚拟机的高分辨率时间源的当前值,以纳秒为单位 final long deadline = timed ? System.nanoTime() + nanos : 0L; 复制代码
WaitNode q = null;
它是一个链表结构 volatile
被用来修饰会被不同线程访问和修改的变量, 后边还会讲到,此处先有个印象
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } 复制代码
for (;;) {...}
// 判断线程状态 如果中断,直接抛出异常,并且将```q```从节点中移除 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } 复制代码
// 线程状态 最先是 NEW int s = state; if (s > COMPLETING) { // 如果线程完成状态 移除q节点 并返回当前线程状态 最终通过report返回结果 if (q != null) q.thread = null; return s; } 复制代码
else if (s == COMPLETING) // cannot time out yet Thread.yield(); 复制代码
else if (q == null) q = new WaitNode(); 复制代码
else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); 复制代码
如果有新任务进来,会新建一个节点,然后利用CAS操作放入waiter链表的头部,这里是一个原子性操作, CAS
else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); 复制代码
这里判断消亡时间,如果超时了,移除节点,并返回线程状态, LockSupport
使线程阻塞,有的同学可能会问, for
不是已经阻塞了吗:question:那为啥还调用 LockSupport
,这里其实是线程优化,想想你一直for循环一直判断是不是也会产生开销,加上 LockSupport
避免不要的操作,其实 for
的整个过程是实现了 自旋锁
阻塞了不就没法执行了吗, park
加锁方法还有一个对应的 unpark
相当于释放 锁
,但此处没有看到这个方法,那么它在哪个地方呢:question:我们大体应该可以猜到,它应该是在执行阶段,还记得 RunnableFuture
public void run() { // 判断线程状态 如果不为NEW 或者 并判断值是否一样,如果不一样就直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 这一步是执行我们的任务 Callablec = callable; // 如果任务存在 并且处于NEW状态 if (c != null && state == NEW) { V result; boolean ran; try { // 执行任务 result = c.call(); ran = true; } catch (Throwable ex) { // 异常检测 result = null; ran = false; setException(ex); } // 执行成功,设置返回值 if (ran) set(result); } } finally { // 这里其实是释放阶段 防止并发调用 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 这一步其实是防止在中断时提交任务,内部是调用了一个Thread.yield() if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 复制代码
下面我们重点看一下这个 set
protected void set(V v) { // 先比较是否相同 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // outcome 是返回的结果或者异常 setException这里是设置异常结果 异常赋值给outcome outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置最终状态 finishCompletion(); } } 复制代码
类是一个很特殊的类,它的内部几乎都是 native
方法,它可以使得我们能够操作 内存空间
来获得更高的性能,但一般我们很少使用它,因为它不被 gc
控制,使用不当 jvm
可能都会挂了。我们重点关注一下 finishCompletion
private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 遍历节点释放锁 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 默认下它是一个空方法,可以用于执行完成的回调方法, 可以覆盖实现 done(); callable = null; // to reduce footprint } 复制代码
我们可以看到在这个内部它是调了一个 unpark
方法的,可以看出之前 awaitDone()
方法内部的线程阻塞在这个地方被 唤醒
了, 再回回过头看 awaitDone()
方法,就明白为啥要调用 park
方法了,因为线程没有达到大于 COMPLETING
状态,它会一直 for
最后一个就是 report
private V report(int s) throws ExecutionException { Object x = outcome; // 在set的时候 我们可以看到有设置为这个状态。 V就是传入的类型 if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 复制代码
于是我们的 get
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 复制代码
本期到这里就结束了, 总结一下,本节主要讲了 Callable、Future与FutureTask
的常用方法,以及从问题触发,带大家分析了一下 FutureTask
的源码,这里大家要好好理解,不要去背,想要告诉大家的是学习要带着问题, 看源码一定要大胆猜测,冷静分析