• 线程池——futuretask、CompletionService、CompletableFuture


    FUture:
    futuretask.get()——没有结果的时候,让出cpu,阻塞住,等待唤醒;
    futuretask.run()——拿到结果的时候,唤醒阻塞的线程

    在这里插入图片描述

    Callable和Runnable的区别

    Callable的call方法可以有返回值,可以声明抛出异常;而runnable不可以。和 Callable配合的有一个Future类,通过Future可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable做不到的,Callable 的功能要比Runnable强大。

    问题:Callable 实例能否和 Runnable 实例一样,作为 Thread 线程实例的 target 来使用吗?答 案是不行:Thread 的 target 属性的类型为 Runnable,而 Callable 接口与Runnable 接口之间没有任 何的继承关系,并且二者唯一方法在的名字上也不同。显而易见,Callable 接口实例没有办法作为 Thread 线程实例的 target 来使用。既然如此,那么该如何使用 Callable 接口去创建线程呢?一个 重要的在 Callable 接口与 Thread 线程之间起到搭桥作用的接口,马上就要登场了。

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }
    @FunctionalInterface
    public interface Callable<V> {
        V call() throws Exception;
    }
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    RunnableFuture 继承 Runnable 接口,从而保证了其实例可以作为 Thread线程实例的 target 目标;同时,RunnableFuture 通过继承 Future 接口,从而保证了通过它可以获 取未来的异步执行结果。

    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("通过Runnable方式执行任务");
        }
    }).start();
    
    // 需要借助FutureTask
    FutureTask task = new FutureTask(new Callable() {
        @Override
        public Object call() throws Exception {
            System.out.println("通过Callable方式执行任务");
            Thread.sleep(3000);
            return "返回任务结果";
        }
    });
    new Thread(task).start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    future

    Runnable一般表示要执行的任务的过程, 而Future则表述执行任务的结果 (或者说是任务的一个句柄, 可获取结果, 取消任务等)。
    (1)能够取消异步执行中的任务。
    (2)判断异步任务是否执行完成。
    (3)获取异步任务完成后的执行结果。

    public interface Future<V> {
    // 取消任务的执行,参数表示是否立即中断任务执行,或者等任务结束
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
          // 等待任务执行结束,返回泛型结果.中断或任务执行异常都会抛出异常
        V get() throws InterruptedException, ExecutionException;
    // 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    FutureTask

    FutureTask 类实现 了 RunnableFuture 接口。
    Future归根结底只是一个接口,而FutureTask实现了这个接口,同时还实现了Runnalbe接口;
    可以阻塞式的获取处理结果,非阻塞式获取任务处理状态;

    总结:FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
    成员变量

        private volatile int state;
    
        //整合了callable——将其包装到run方法中,利用起可以获得结果的功能
        private Callable<V> callable;
        /** The result to return or exception to throw from get() */
        //task的执行结果
        private Object outcome; // non-volatile, protected by state reads/writes
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    构造方法

    //注意FutureTask其实就是runnable,将callable包装进去
    public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    任务的取消和完成

        public boolean isCancelled() {
            return state >= CANCELLED;
        }
    
        public boolean isDone() {
            return state != NEW;
        }
        //任务完成之后什么也没做
    	protected void done() { }
    //任务取消是通过打断线程来实现
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    任务的运行
    因为futuretask是作为runner提交给创建的线程池的,所以线程池submit的时候,某个线程就会执行run方法,就会调用这里的run方法

    public void run() {
     // 任务已经被执行,直接退出
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                    // 执行任务
                    //	run里面调用的是本类成员变量callable的call方法
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        // 记录异常
                        setException(ex);
                    }
                    if (ran)
                    //将结果丢给成员变量outcome & 唤醒主现场future.get()
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
      protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
        
     private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    任务结果的获取

    //无限阻塞等待
     public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
        /**
         * @throws CancellationException {@inheritDoc}
         */
         //有限时间阻塞等待
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    使用Callable和FutureTask创建线程的具体步骤
    在这里插入图片描述

    基本步骤:

    (1)创建一个 Callable 接口的实现类,并实现其 call()方法,编写好异步执行的具体逻辑, 并且可以有返回值。
    (2)使用 Callable 实现类的实例,构造一个 FutureTask 实例。
    (3)使用 FutureTask 实例,作为 Thread 构造器的 target 入参,构造新的 Thread 线程实例;
    (4)调用 Thread 实例的 start 方法启动新线程,启动新线程的 run()方法并发执行。其内部的 执行过程为:启动 Thread 实例的 run()方法并发执行后,会执行 FutureTask 实例的 run()方法,最 终会并发执 Callable 实现类的 call()方法。
    (5)调用 FutureTask 对象的 get()方法,阻塞性的获得并发线程的执行结果。
    FutureTask 的 Callable 成员的 call()方法执行完成后,会将结果保存在 FutureTask 内部的 outcome 实例属性中。以上演示实例的 Callable 实现类中,这里 call()方法中业务逻辑的返回结果,是 "外卖到了!!"这句话。

    "外卖到了!!"这句话被返回之后,作为结果将被保存在 FutureTask 内部的 outcome 实例属性中,至此, 异步的“returnableThread”线程执行完毕。在“main”线程处理完自己的事情(以上实例中是一 个消磨时间的循环)后,通过 futureTask 的 get 实例方法获取异步执行的结果。这里有两种情况:
    (1)futureTask 的结果 outcome 不为空,callable.call()执行完成;在这种情况下,futureTast.get 会直接取回 outcome 结果,返回给“main”线程(结果获取线程)。
    (2)futureTask 的结果 outcome 为空,callable.call()还没有执行完。
    在这种情况下,“main”线程作为结果获取线程会被阻塞住,一直被阻塞到 callable.call()执行 完成。当执行完后,最终结果保存到 outcome 中,futureTask 会唤醒的“main”线程,去提取callable.call()执行结果。

    FutureTask的缺点
    通过 FutureTask 的 get 方法获取异步结果时,主线程也会被阻塞的。是异步阻塞模式。异步阻塞的效率往往是比较低的,被阻塞的主线程,不能干任何事情,唯一能干的,就是在 傻傻等待。原生 Java API,除了阻塞模式的获取结果外,并没有实现非阻塞的异步结果获取方法。

    ExecutorService & CompletionService

    参考链接
    这个虽然是获取最先执行完成的task结果,但是如果结果队列里没有元素,依然会阻塞住主线程。

    背景:

    当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,可能拿到的Future并不是第一个执行完成的Callable的Future,就会进行阻塞,从而不能获取到第一个完成的Callable结果,那么这样就造成了很严重的性能损耗问题。
    CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。

    具体流程

    run方法执行任务——执行完成——结果放到队列&唤醒主线程处的future.task().get()方法;

    对比线程池ExecutorService直接submit——future.get():

    public static void main(String[] args) throws InterruptedException, ExecutionException {
    
        Random random = new Random();
        ExecutorService pool = Executors.newFixedThreadPool(5);     
        List<Future<String>> resultFuture = new ArrayList<>();
    
        for(int i = 0; i<4; i++) {
            final int tmp = i;
            Future<String> future = pool.submit(() -> {
                Thread.sleep(1000+10*tmp);
                System.out.println(Thread.currentThread().getName()+"|完成任务");
                return "data"+random.nextInt(10);
            });
            resultFuture.add(future);
        }
        System.out.println("--------------");
    
        for(Future<String> future:resultFuture) {
            String result = future.get();
            System.out.println("执行结果"+result);      
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    总结

    1. 在执行大量相互独立和同构的任务时,可以使用CompletionService
    2. CompletionService可以为任务的执行设置时限,主要是通过BlockingQueue的poll(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务
    3. CompletionService是在线程执行完拿到结果之后放入到了BlockingQueue中的
    4. CompletionService是BlockQueue.take()队列都拿完了,或者没有放入——才阻塞
    5. CompletionService 取出所消耗的时间是哪个最后完成任务的时间

    CompletionService源码分析

    成员变量

        private final Executor executor;//自定义传入的线程池;实质上是提交任务还是交由了线程池来执行
        private final AbstractExecutorService aes;
        //当线程池中的一个线程把task计算完了,就会放入到这个已经完成的 执行结果future队列
        //主线程就是通过completionQueue.task()来阻塞获取已经完成了的future
        //ps:我们知道提交的任务执行的结果就是在FutureTask.outcome存在的,所以拿到FutureTask就能获取到结果。
        //我们知道BlockingQueue.take()和FutureTask.get()都会阻塞。但是这里要强调的使用这两行在这里只有第一行BlockingQueue.take()会阻塞,FutureTask.get()不会阻塞。
        private final BlockingQueue<Future<V>> completionQueue;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    构造方法

    
        public ExecutorCompletionService(Executor executor) {
            if (executor == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }
    
       //就是传入了一个Executor和给初始化无界阻塞队列,以便于后续能够存储线程执行的结果FutureTask。
        public ExecutorCompletionService(Executor executor,
                                         BlockingQueue<Future<V>> completionQueue) {
            if (executor == null || completionQueue == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = completionQueue;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    主线程提交task
    submit()方法最终会委托给内部的executor去执行任务

    //讲task包装成了FutureTask:目的自然是返回他本身future
    //这里真正调用线程池的execute(task)时候又将FutureTask包装成了QueueingFuture(也是futuretask的继承类)
    //目的:因为futuretask的get()方法会阻塞,因此这里弄个继承类包装,让QueueingFuture可以将计算完成后的future丢到完成队列completionQueue里面
        public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
        public Future<V> submit(Runnable task, V result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    内部类

    
         //它是ExecutorCompletionService的内部类,重写了FuturTask的done()方法将FutureTask放入到BlockingQueue中。
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    从队列获取完成的future

    //阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,
    //如果没有就会阻塞,直到有任务完成返回结果。
        public Future<V> take() throws InterruptedException {
            return completionQueue.take();
        }
    //从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞
        public Future<V> poll() {
            return completionQueue.poll();
        }
    
        public Future<V> poll(long timeout, TimeUnit unit)
                throws InterruptedException {
            return completionQueue.poll(timeout, unit);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    一些坑

    坑一
    使用它异步提交,在收集结果会导致乱序
    项目中你有可能使用CompletionService批量(分页 每页是一个任务)去查询数据库(order排序了等),然后在汇总那结果add到一个list中。
    此时用这种方式,由于不确定那个任务先返回,就add到了list中,会导致数据库中每页排好序的结果,由于汇总不按照找顺序汇总就乱序了。
    解决方式有两种:
    一种是再排序
    一种是上面我们效率第一点的例子

    坑二
    在Spring项目中你的CompletionService的构建应当是方到跟使用线程的方法内部new出来,而不应该放到Controller中作为成员变量存在

    CompletionService如果作为一个成员变量,
    来一个A用户请求方法,方法执行完结果放入BlockingQueue中执行的结果被另外一个用户获取到了,A用户的方法执行到取结果的地方。
    此时来了B用户请求方法,方法也执行到了取结果的地方,此时B可能取到A执行的结果,A也有可能B执行的结果,就是因为CompletionService成了共享对象,其内部成员BlockingQueue也成了共享对象。

    CompletableFuture

    不管CompletableFuture()执行过程中报错、正常完成、还是取消,都会被标示为已完成,所以最后CompletableFuture.isDown()为true。

    ForkJoin线程池,这个公共线程池中的所有线程都是Daemon线程,意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。

  • 相关阅读:
    C/C++语言100题练习计划 97——素数对
    金仓数据库 KingbaseES 插件参考手册 U
    23端口登录的Telnet命令+传输协议FTP命令
    Vue 安装 vue的基本使用 vue的初步使用步骤
    js操作字符串的方法
    2023-2024 人工智能专业毕设如何选题
    【Linux】冯诺依曼体系结构、操作系统、进程概念、进程状态、环境变量、进程地址空间
    matlab 数据处理 命令集合
    Python函数进阶:探索高级函数特性与技巧
    分析每一段的代码的代码及代码运行的结果
  • 原文地址:https://blog.csdn.net/m0_46598535/article/details/125426506