• jdk 线程池任务提交机制&&任务执行流程详解


    jdk 线程池任务提交流程&&任务执行机制

    任务提交机制

    线程池执行任务主要有两种方式:execute()、submit()

    注意:execute() 执行任务时,如果有异常没有被捕获会直接抛出

    ​ submit() 执行任务时,会吞并异常,除非调用get() 获取计算结果,当抛出异常时会捕获异常

    Executors (内部使用AbstractExecutorService的子类DelegatedExecutorService):

    • 执行execute(),最终是调用ThreadPoolExecutor的execute方法
    • 执行submit(),会通过newTaskFor创建FutureTask,最终还是执行ThreadPoolExecutor的execute方法

    很明显,线程池调用execute()最终调用ThreadPoolExecutor的execute()传入的是runnable,而线程池调用submit()最终也会调用ThreadPoolExecutor的execute(),但是传入的是FutureTask,FutrueTask内部维护了callable(执行任务,产生结果)、state通过cas保证线程安全,FutrueTask实现了RunnableFuture接口,而该接口又继承了Runnable(使得FutrueTask能够被线程调度)、继承Future(get()得到获取callable任务的计算结果,cancel()取消)

    public interface Executor {
        void execute(Runnable command);
    }
    
    public interface ExecutorService extends Executor {}
    
    public abstract class AbstractExecutorService implements ExecutorService {}
    
    public class ThreadPoolExecutor extends AbstractExecutorService {}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    我们自己创建的ThreadPoolExecutor 继承 AbstractExecutorService ,AbstractExecutorService 实现 Executor,所以ThreadPoolExecutor可以调用execute,执行Runnable接口,但是如果想要能够返回异步线程的执行结果,捕获异步任务的异常,就得借助FutureTask,通过调用submit返回的Future的get()获取callable()的计算结果

    所以就出现了ExecutorService

    public interface ExecutorService extends Executor {
      // 传入Callable
    	<T> Future<T> submit(Callable<T> task);
      // 传入Runnable,并且直接给出当前submit执行任务后的返回值
      <T> Future<T> submit(Runnable task, T result);
      
      Future<?> submit(Runnable task);
      
      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
      
      ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    手动传入个result,在调用call()直接返回的就是传入的result

    static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    但是不论是调用execute() 还是 submit()最终都会把Runnable或者Callable通过newTaskFor()封装成FutureTask,并把引用赋给RunnableFuture

    newTaskFor() 创建的是FutureTask,内部通过维护state(cas)保证线程安全,如果传递过来的runnable和结果则会创建RunnableAdapter(实现了Callable接口)

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
      return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
      return new FutureTask<T>(callable);
    }
    
    public class FutureTask<V> implements RunnableFuture<V> {
      public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
      }
    }
    
    static final class RunnableAdapter<T> implements Callable<T> {
      final Runnable task;
      final T result;
      RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
      }
      public T call() {
        task.run();
        return result;
      }
    }
    
    public Future<?> submit(Runnable task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<Void> ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;

    FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程

    在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。

    1. Callable接口类似于Runnable,只是Runnable没有返回值。
    2. Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
    3. Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

    任务调度

    1.首先检查线程池的运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING状态下执行任务
    2.如果workCount < corePoolSize,则创建并启动一个线程来执行新提交的任务
    3.如果workCount > corePoolSize 且线程池中阻塞队列未满,则任务添加到阻塞队列中
    4.如果workCount > corePoolSize && workCount < maximumPoolSize 且线程池的阻塞队列已经满了,则创建并且启动一个线程来执行新的任务,
    5.如果workCount > maximumPoolSize,并且线程池的队列已经满了,则根据拒绝策略来处理该任务;默认的方式是直接抛出异常

    核心代码

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {  // 当前工作线程数是否设定的核心线程数小于核心线程数
                if (addWorker(command, true)) // 创建核心线程执行任务
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) { // 当前工作线程数大于核心线程数(可能核心线程数满了,或者设定的初始核心线程数为0)
              // 如果创建的线程池的阻塞队列能够把当前任务入队(阻塞队列容量未满,或者创建的是SynchronousQueue,容量为0)
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command)) // 如果线程被shutdown,则把刚才入队的任务移出队列,走拒绝策略
                    reject(command);
                else if (workerCountOf(recheck) == 0) // 检查当前创建线程数,如果等于0则直接创建线程并在阻塞度列中取任务去执行,此时创建线程要跟最大线程数比较
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) // 判断是否达到最大线程数(如果创建的是newCachedThreadPool,创建出来的线程都是救急线程,会让创建出来的线程去执行当前任务)
              // 达到最大线程数走拒绝策略
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    线程池任务执行流程

    execute –> addWorker –>runworker (getTask)

    1、如果当前线程池中创建线程的个数小于核心线程数,则addWorker(command, true),command为runnable,true代表创建worker时比较的是核心线程数,false比较的是最大线程数;

    • new Worker(firstTask);firstTask为runnable,在worker的构造方法中,完成对成员变量runnable、thread的赋值,runnable就是firstTask,通过threadfactory创建的thread传入的runable参数就是worker本身,注意woker类实现runnable接口;
    • 从现在开始才是真正执行任务,从woker中取出成员变量thread,让该线程处于就绪状态(start()),执行worker的run方法,调用的是worker中的runWorker(this),传递当前对象的引用(还是woker),最终调用的是前面创建worker时传入的firstTask(worker.firstTask);
    • firstTask执行完以后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

    2、 如果当前线程池中创建线程的个数大于核心线程数,则判断 workQueue.offer(command) 该任务是否能够入队如果能够顺利入队,(注意这个入队入的是我们创建线程池指定的队列,比如说LinkedBlockingQueue),会在判断一次线程池状态:

    • 如果没有在运行则会把刚入队的任务移除队列,并走拒绝策略;
    • 线程池状态正常,则判断当前线程池创建的线程数是否等于0,如果是则创建线程,addWorker(null, false); 在该方法内部比较的是当前线程数与最大线程数

    3、如果没有满足 isRunning© && workQueue.offer(command) 则 直接调用addWorker(command, false);在该方法内部比较的是当前线程数与最大线程数,返回失败则走拒绝策略

    以上便是jdk 线程池任务提交机制以及任务的执行流程,如有误解,请在评论区指出,谢谢

  • 相关阅读:
    虹科案例 | EtherCAT运动控制器与IO在半导体封装设备固晶机上的应用
    Java中的InetAddress类
    2020年6月编程Scratch一级
    身为程序员哪一个瞬间让你最奔溃 ?
    Element UI表格的序号递增(点击分页后)
    Linux中udp服务端,客户端的开发
    如何开发物联网 APP?
    【智慧公寓】东胜物联嵌入式硬件解决方案,为智慧公寓解决方案商降本增效,更快实现产品规模化生产
    OSI网络分层模型
    conductor 3.7以上 jedisCommands ArrayIndexOutOfBoundsException 解决分析
  • 原文地址:https://blog.csdn.net/weixin_46195957/article/details/128023477