• 线程和线程池


    创建线程的四种方式

    1 继承Thread类

    继承Thread类创建线程的步骤为:

    (1)创建一个类继承Thread类,重写run()方法,将所要完成的任务代码写进run()方法中;

    (2)创建Thread类的子类的对象;

    (3)调用该对象的start()方法,该start()方法表示先开启线程,然后调用run()方法;

    public class Thread1 {
       
        public static void main(String[] args) {
       
            Thread.currentThread().setName("主线程");
            System.out.println(Thread.currentThread().getName()+":"+"输出的结果");
            //创建一个新线程
            ThreadDemo1 thread1 = new ThreadDemo1();
            //为线程设置名称
            thread1.setName("线程一");
            //开启线程
            thread1.start();
        }
    }
     
    class ThreadDemo1 extends Thread{
       
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+":"+"输出的结果");
        }
       
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2 实现Runnable接口

    实现Runnable接口创建线程的步骤为:

    (1)创建一个类并实现Runnable接口

    (2)重写run()方法,将所要完成的任务代码写进run()方法中

    (3)创建实现Runnable接口的类的对象,将该对象当做Thread类的构造方法中的参数传进去

    (4)使用Thread类的构造方法创建一个对象,并调用start()方法即可运行该线程

    public class Thread2 {
       
        public static void main(String[] args) {
            Thread.currentThread().setName("主线程");
            System.out.println(Thread.currentThread().getName()+":"+"输出的结果");
            //创建一个新线程
            Thread thread2 = new Thread(new ThreadDemo2());
            //为线程设置名称
            thread2.setName("线程二");
            //开启线程
            thread2.start();
        }
       
    }
     
    class ThreadDemo2 implements Runnable {
     
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+":"+"输出的结果");
        }
       
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3 实现Callable接口

    实现Callable接口创建线程的步骤为:

    (1)创建一个类并实现Callable接口

    (2)重写call()方法,将所要完成的任务的代码写进call()方法中,需要注意的是call()方法有返回值,并且可以抛出异常

    (3)如果想要获取运行该线程后的返回值,需要创建Future接口的实现类的对象,即FutureTask类的对象,调用该对象的get()方法可获取call()方法的返回值

    (4)使用Thread类的有参构造器创建对象,将FutureTask类的对象当做参数传进去,然后调用start()方法开启并运行该线程。

    public static void main(String[] args) {
            FutureTask<String> futureTask = new FutureTask<>(new Thread003());
            new Thread(futureTask, "thread003").start();
            try {
                // 这里main线程会等待这个值返回,不会会继续向下执行
                System.out.println(Thread.currentThread() + "线程执行:返回值为:"+futureTask.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("执行结束");
        }
    static class Thread003 implements Callable<String>{
            @Override
            public String call() throws Exception {
                System.out.println(Thread.currentThread() + "线程执行: 第三种创建线程的方法");
                return "第三种创建线程的方法";
            }
        }
    
    //Thread[thread003,5,main]线程执行: 第三种创建线程的方法
    //Thread[main,5,main]线程执行:返回值为:第三种创建线程的方法
    //执行结束
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    4 使用线程池创建

    具体看下面介绍。

    为什么使用线程池

    线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

    这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    ThreadPoolExecutor线程池的使用

    线程池的创建

    《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

    我们可以通过ThreadPoolExecutor来创建一个线程池。

    Executors 返回线程池对象的弊端如下:

    • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
    • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

    方式一:通过构造方法实现

    image.png

    public ThreadPoolExecutor(int corePoolSize, //核心线程数
                              int maximumPoolSize, //最大线程数
                              long keepAliveTime, //生存时间  -------  针对救急线程
                              TimeUnit unit, //时间单位 针对救急线程 核心线程创建之后一直存活
                              BlockingQueue<Runnable> workQueue, //阻塞队列
                              ThreadFactory threadFactory, //线程工厂,为线程创建时起个好名字
                              RejectedExecutionHandler handler) //拒绝策略
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    创建一个线程池时需要输入几个参数,如下。

    1. corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

    2. maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果

    3. keepAliveTime(线程活动保持时间):线程池的救急线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

    4. TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

    5. workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
      ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
      LinkedBlockingQueue:一个基于链表结构的阻塞队列,阻塞队列是无界的,可以放任意数量的任务,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
      SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      PriorityBlockingQueue:一个具有优先级的无限阻塞队列。不同的线程到阻塞队列中取任务,那么如何保证线程安全的呢?通过查看源码发现阻塞队列中使用ReentrantLock锁保证线程安全。

    6. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设
      置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字。如下代码是ThreadPoolExecutor默认使用的线程工厂:

      // 默认的线程工厂,如果先创建线程池的时候没有指定线程工厂,那么就会使用这一个。
      static class DefaultThreadFactory implements ThreadFactory {
              private static final AtomicInteger poolNumber = new AtomicInteger(1);
              private final ThreadGroup group;
              private final AtomicInteger threadNumber = new AtomicInteger(1);
              private final String namePrefix;
      
              DefaultThreadFactory() {
                  SecurityManager s = System.getSecurityManager();
                  group = (s != null) ? s.getThreadGroup() :
                                        Thread.currentThread().getThreadGroup();
                  namePrefix = "pool-" +
                                poolNumber.getAndIncrement() +
                               "-thread-";
              }
      
              public Thread newThread(Runnable r) {
                  Thread t = new Thread(group, r,
                                        namePrefix + threadNumber.getAndIncrement(),
                                        0);
                  if (t.isDaemon())
                      t.setDaemon(false);
                  if (t.getPriority() != Thread.NORM_PRIORITY)
                      t.setPriority(Thread.NORM_PRIORITY);
                  return t;
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
    7. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。
      AbortPolicy:直接抛出异常。
      CallerRunsPolicy:只用调用者所在线程来运行任务。
      DiscardOldestPolicy:丢弃队列里最早的一个任务,并执行当前任务。
      DiscardPolicy:不处理,丢弃掉。
      当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gh1oVfcE-1660577421688)(https://b3logfile.com/file/2022/03/image-6f958b6e.png)]

      RejectedExecutionHandler接口中只有一个方法如果我们想要实现自己的处理逻辑只需要实现这个接口,重写rejectedExecution()方法即可

      public interface RejectedExecutionHandler {
      
          /**
           * Method that may be invoked by a {@link ThreadPoolExecutor} when
           * {@link ThreadPoolExecutor#execute execute} cannot accept a
           * task.  This may occur when no more threads or queue slots are
           * available because their bounds would be exceeded, or upon
           * shutdown of the Executor.
           *
           * 

      In the absence of other alternatives, the method may throw * an unchecked {@link RejectedExecutionException}, which will be * propagated to the caller of {@code execute}. * * @param r the runnable task requested to be executed * @param executor the executor attempting to execute this task * @throws RejectedExecutionException if there is no remedy */ void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }

      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19

      看一看一下AbortPolicy策略的实现逻辑

      public static class AbortPolicy implements RejectedExecutionHandler {
              /**
               * Creates an {@code AbortPolicy}.
               */
              public AbortPolicy() { }
      
              /**
               * Always throws RejectedExecutionException.
               *
               * @param r the runnable task requested to be executed
               * @param e the executor attempting to execute this task
               * @throws RejectedExecutionException always
               */
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  throw new RejectedExecutionException("Task " + r.toString() +
                                                       " rejected from " +
                                                       e.toString());
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19

    方式二:通过 Executor 框架的工具类 Executors 来实现

    我们可以创建三种类型的 ThreadPoolExecutor:

    • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列(LinkedBlockedQueue无界队列)中,待有线程空闲时,便处理在任务队列中的任务。
    • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列(LinkedBlockedQueue无界队列)中,待线程空闲,按先入先出的顺序执行队列中的任务。
    • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。任务队列使用SynchronousQueue。

    image.png

    newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {
            //通过ThreadFactory 自定义线程名称
            private AtomicInteger atomicInteger = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"线程"+atomicInteger.getAndIncrement());
            }
        });
        executorService.execute(()->{
            log.debug("1");
        });
        executorService.execute(()->{
            log.debug("2");
        });
        executorService.execute(()->{
            log.debug("3");
        });
    
    }
    13:41:51.863 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.CachedThreadPoolTest - 2
    13:41:51.863 [pool-1-thread-1] DEBUG com.pingfa.demo.executor.CachedThreadPoolTest - 1
    13:41:51.863 [pool-1-thread-3] DEBUG com.pingfa.demo.executor.CachedThreadPoolTest - 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    特点

    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • 阻塞队列是无界的,可以放任意数量的任务

    评价: 适用于任务量已知,相对耗时的任务

    newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            log.debug("1");
        });
        executorService.execute(()->{
            log.debug("2");
        });
        executorService.execute(()->{
            log.debug("3");
        });
    }
    
    13:40:45.690 [pool-1-thread-1] DEBUG com.pingfa.demo.executor.CachedThreadPoolTest - 1
    13:40:45.690 [pool-1-thread-3] DEBUG com.pingfa.demo.executor.CachedThreadPoolTest - 3
    13:40:45.690 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.CachedThreadPoolTest - 2
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    特点

    • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着

      • 全部都是救急线程(60s 后可以回收)
      • 救急线程可以无限创建
    • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)

      SynchronousQueue<Integer> integers = new SynchronousQueue<>();
      new Thread(() -> {
          try {
              log.debug("putting {} ", 1);
              integers.put(1);
              log.debug("{} putted...", 1);
              log.debug("putting...{} ", 2);
              integers.put(2);
              log.debug("{} putted...", 2);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      },"t1").start();
      sleep(1);
      new Thread(() -> {
          try {
              log.debug("taking {}", 1);
              integers.take();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      },"t2").start();
      sleep(1);
      new Thread(() -> {
          try {
              log.debug("taking {}", 2);
              integers.take();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      },"t3").start();
      //
      11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
      11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
      11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
      11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
      11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
      11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39

    评价 : 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(()->{
            int i = 1 / 0;
            log.debug("1");
        });
        executorService.execute(()->{
            log.debug("2");
        });
        executorService.execute(()->{
            log.debug("3");
        });
    }
    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    	at com.pingfa.demo.executor.SingleThreadPoolTest.lambda$main$0(SingleThreadPoolTest.java:17)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    13:52:07.280 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.SingleThreadPoolTest - 2
    13:52:07.284 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.SingleThreadPoolTest - 3
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    使用场景:

    希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

    区别:

    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
    • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
      • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
    • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
      • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

    向线程池提交任务

    可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
    通过以下代码可知execute()方法输入的任务是一个Runnable类的实例

    threadsPool.execute(new Runnable() {
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    	}
    });
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个
    future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

    Future<Object> future = executor.submit(harReturnValuetask);
    	try {
    		Object s = future.get();
    	} catch (InterruptedException e) {
    		// 处理中断异常
    	} catch (ExecutionException e) {
    		// 处理无法执行任务异常
    	} finally {
    	// 关闭线程池
    	executor.shutdown();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    示例:

    // 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    "=====================================测试执行==================================================="
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<String> future = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.debug("执行任务");
                Thread.sleep(1000);
                return "ok";
            }
        });
        log.debug("线程返回结果:{}", future.get());
    }
    14:02:00.486 [pool-1-thread-1] DEBUG com.pingfa.demo.executor.SubmitCallableTest - 执行任务
    14:02:01.490 [main] DEBUG com.pingfa.demo.executor.SubmitCallableTest - 线程返回结果:ok
    
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    "=====================================测试执行==================================================="
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        List<Future<Integer>> futures = executorService.invokeAll(Arrays.asList(
            () -> {
                log.debug("1");
                return 1;
            }, () -> {
                log.debug("2");
                return 2;
            }, () -> {
                log.debug("3");
                return 3;
            }));
        futures.forEach((t)-> {
            try {
                System.out.println(t.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
    15:03:52.601 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - 2
    15:03:52.601 [pool-1-thread-1] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - 1
    15:03:52.604 [pool-1-thread-1] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - 3
    1
    2
    3
    
    
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException;
    
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
    
    "=====================================测试执行==================================================="
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        //invokeAll(executorService);
        Integer result = executorService.invokeAny(Arrays.asList(() -> {
            log.debug("begin 1");
            Thread.sleep(1000);
            log.debug("end 1");
            return 1;
        }, () -> {
            log.debug("begin 2");
            Thread.sleep(500);
            log.debug("end 2");
            return 2;
        },
                                                                 () -> {
                                                                     log.debug("begin 3");
                                                                     Thread.sleep(2000);
                                                                     log.debug("end 3");
                                                                     return 3;
                                                                 }));
        log.debug("{}", result);
    }
    
    15:16:49.377 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - begin 2
    15:16:49.377 [pool-1-thread-1] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - begin 1
    15:16:49.883 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - end 2
    15:16:49.883 [pool-1-thread-2] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - begin 3
    15:16:49.883 [main] DEBUG com.pingfa.demo.executor.InvokeAllTestCallableTest - 2
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    关闭线程池

    可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线
    程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程
    ,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而
    shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。但已提交任务会执行完,在队列中的任务也会执行完。只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

    /*
    线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完,在队列中的任务也会执行完
    - 此方法不会阻塞调用线程的执行
    */
    void shutdown();
    
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(SHUTDOWN);
            // 仅会打断空闲线程
            interruptIdleWorkers();
            onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
        tryTerminate();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    /*
    线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务,正在执行的任务结束运行
    */
    List<Runnable> shutdownNow();
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(STOP);
            // 打断所有线程
            interruptWorkers();
            // 获取队列中剩余任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终结
        tryTerminate();
        return tasks;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    // 不在 RUNNING 状态的线程池,此方法就返回 true
    boolean isShutdown();
    // 线程池状态是否是 TERMINATED
    boolean isTerminated();
    // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
    情,可以利用此方法等待
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    合理地配置线程池

    要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

    任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
    任务的优先级:高、中和低。
    任务的执行时间:长、中和短。
    任务的依赖性:是否依赖其他系统资源,如数据库连接。

    性质不同的任务可以用不同规模的线程池分开处理。

    CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池

    由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。

    混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过
    Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高
    的任务先执行。

    注意: 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行

    执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让
    执行时间短的任务先执行。

    依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越
    长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU

    建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点
    儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。

    线程池的监控

    如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根
    据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的
    时候可以使用以下属性。
    taskCount:线程池需要执行的任务数量。
    completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
    largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是
    否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
    getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销
    毁,所以这个大小只增不减。
    getActiveCount:获取活动的线程数。

    通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。

    public static void main(String[] args) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                    5,
                    0,
                    TimeUnit.HOURS,
                    new ArrayBlockingQueue<>(3)
            ){
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.out.println("任务执行之前执行");
                }
    
                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.out.println("任务执行后执行");
                }
    
                @Override
                protected void terminated() {
                    System.out.println("线程池关闭时执行");
                }
            };
            threadPoolExecutor.execute(()->{
                System.out.println("任务执行");
            });
           threadPoolExecutor.shutdown();
        }
    //任务执行后执行
    //任务执行
    //任务执行之前执行
    //线程池关闭时执行
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    线程池的实现原理

    当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?本节来看一下线程池
    的主要处理流程,处理流程图如图9-1所示。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8FkR6rA7-1660577421689)(https://b3logfile.com/file/2021/07/image-ad5167a9.png)]

    从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。

    1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作
      线程(核心线程)来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
    2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这
      个工作队列里。如果工作队列满了,则进入下个流程。
    3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程(应急线程)来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

    ThreadPoolExecutor执行execute()方法的示意图,如图9-2所示。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6V3edFoi-1660577421690)(https://b3logfile.com/file/2021/07/image-b61c7679.png)]

    ThreadPoolExecutor执行execute方法分下面4种情况。

    1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
    2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
    3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程(应急线程)来处理任务(注意,执行这一步骤需要获取全局锁)。
    4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
      RejectedExecutionHandler.rejectedExecution()方法。

    ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

    源码分析:上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的,线程池执行任务的方法如下

    // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    
    private final BlockingQueue<Runnable> workQueue;
    
    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();
    
        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    工作线程(核心线程):线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。

    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    ThreadPoolExecutor中线程执行任务的示意图如图9-3所示。

    image.png

    线程池中的线程执行任务分两种情况,如下。

    1. 在execute()方法中创建一个线程时,会让这个线程执行当前任务。
    2. 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

    ScheduledThreadPoolExecutor任务调度线程池

    ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但
    ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而
    ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

    从下图也可以看出,ScheduleThreadPoolExecutor只能指定核心线程数量,线程工厂,拒绝策略。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DXs5AVu8-1660577421693)(https://b3logfile.com/file/2022/03/image-05e2596f.png)]

    简单使用示例:

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    // 添加两个任务,希望它们都在 1s 后执行
    executor.schedule(() -> {
        System.out.println("任务1,执行时间:" + new Date());
        try { Thread.sleep(2000); } catch (InterruptedException e) { }
    }, 1000, TimeUnit.MILLISECONDS);
    executor.schedule(() -> {
        System.out.println("任务2,执行时间:" + new Date());
    }, 1000, TimeUnit.MILLISECONDS);
    
    任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
    任务2,执行时间:Thu Jan 03 12:45:17 CST 2019
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start...");
    pool.scheduleAtFixedRate(() -> {
        log.debug("running...");
    }, 1, 1, TimeUnit.SECONDS);//延时1秒,每秒执行一次
    
    21:45:43.167 c.TestTimer [main] - start...
    21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
    21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
    21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
    21:45:47.215 c.TestTimer [pool-1-thread-1] - running...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start...");
    pool.scheduleAtFixedRate(() -> {
     log.debug("running...");
     sleep(2);
    }, 1, 1, TimeUnit.SECONDS);
    //输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
    21:44:30.311 c.TestTimer [main] - start...
    21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
    21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
    21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
    21:44:37.362 c.TestTimer [pool-1-thread-1] - running... 
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start...");
    pool.scheduleWithFixedDelay(()-> {
     log.debug("running...");
     sleep(2);
    }, 1, 1, TimeUnit.SECONDS);
    //输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s
    21:40:55.078 c.TestTimer [main] - start...
    21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
    21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
    21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
    21:41:05.147 c.TestTimer [pool-1-thread-1] - running...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务

    运行机制

    ScheduledThreadPoolExecutor中使用的阻塞队列是DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)。

    ScheduledThreadPoolExecutor的执行主要分为两大部分。

    1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。
    2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

    image.png

    ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下的修改。

    1. 使用DelayQueue作为任务队列。
    2. 获取任务的方式不同。
    3. 执行周期任务后,增加了额外的处理。

    实现原理

    ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中。

    ScheduledFutureTask主要包含3个成员变量,如下。

    1. long型成员变量time,表示这个任务将要被执行的具体时间。
    2. long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor的序号。
    3. long型成员变量period,表示任务执行的间隔周期。

    DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

    ScheduledThreadPoolExecutor中的线程执行周期任务的过程。如下图:
    ScheduledThreadPoolExecutor中的线程1执行某个周期任务的4个步骤。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RHW5ljFT-1660577421695)(https://b3logfile.com/file/2022/03/image-dd2923b8.png)]

    1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。

    2)线程1执行这个ScheduledFutureTask。

    3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。

    4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue(DelayQueue.add())。

    获取任务的过程。DelayQueue.take()方法的代码实现:

    public E take() throws InterruptedException {
    		final ReentrantLock lock = this.lock;
    		lock.lockInterruptibly(); // 1
    		try {
    			for (;;) {
    				E first = q.peek();
    				if (first == null) {
    					available.await(); // 2.1
    				} else {
    					long delay = first.getDelay(TimeUnit.NANOSECONDS);
    					if (delay > 0) {
    						long tl = available.awaitNanos(delay); // 2.2
    					} else {
    						E x = q.poll(); // 2.3.1
    						assert x != null;
    						if (q.size() != 0)
    							available.signalAll(); // 2.3.2
    						return x;
    					}
    				}
    			}
    		} finally {
    			lock.unlock(); // 3
    		}
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    1)获取Lock。
    2)获取周期任务。

    1. 如果PriorityQueue为空,当前线程到Condition中等待;否则执行下面的2.2。
    2. 如果PriorityQueue的头元素的time时间比当前时间大,到Condition中等待到time时间;否则执行下面的2.3。
    3. 获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程(2.3.2)。

    3)释放Lock。

    1. ScheduledThreadPoolExecutor在一个循环中执行步骤2,直到线程从PriorityQueue获取到一个元素之后(执行2.3.1之后),才会退出无限循环(结束步骤2)。

    把ScheduledFutureTask放入DelayQueue中的过程。下面是DelayQueue.add()的代码:

    public boolean offer(E e) {
    		final ReentrantLock lock = this.lock;
    		lock.lock(); // 1
    		try {
    			E first = q.peek();
    			q.offer(e); // 2.1
    			if (first == null || e.compareTo(first) < 0)
    				available.signalAll(); // 2.2
    			return true;
    		} finally {
    			lock.unlock(); // 3
    		}
    	}
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1)获取Lock。
    2)添加任务。

    1. 向PriorityQueue添加任务。
    2. 如果在上面2.1中添加的任务时PriorityQueue中没有任务或者添加的任务比头任务的时间小,那么唤醒在Condition中等待的所有线程。

    3)释放Lock。

    FutureTask介绍

    Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

    FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给
    Executor执行,也可以由调用线程直接执行(FutureTask.run())。根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态。

    1)未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。

    2)已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

    3)已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

    在FutureTask的不同状态执行get(), cancel()方法的情况。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7J3Cc1D0-1660577421696)(https://b3logfile.com/file/2022/03/image-b051f493.png)]

    简单使用

    实现原理

    FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)

    AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列

    JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。

    每一个基于AQS实现的同步器都会包含两种类型的操作,如下:

    1. 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。
    2. 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞
      线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

    基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类
    Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

    AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只
    需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。

    具体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0jGWMoQN-1660577421697)(https://b3logfile.com/file/2022/03/image-e4a61729.png)]

    Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。

    FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg) 方法,这个方法的执行过程如下。

    1)调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。

    2)如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。

    3)当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)。

    4)最后返回计算的结果或抛出异常。

    FutureTask.run()的执行过程如下:

    1)执行在构造函数中指定的任务(Callable.call())。

    2)以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。

    3)AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(int arg)来执行release操作(设置运行任务的线程runner为null,然后会返回true)然后唤醒线程等待队列中的第一个线程。

    4)调用FutureTask.done()

    当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态
    CANCELLED,当前执行线程将到AQS的线程等待队列中等待(见下图的线程A、B、C,D)。当某个线程执行FutureTask.run()方法或FutureTask.cancel(…)方法时,会唤醒线程等待队列的第一个线程(见图10-16所示的线程E唤醒线程A)。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-orzTiZcl-1660577421698)(https://b3logfile.com/file/2022/03/image-ff056a27.png)]

    假设开始时FutureTask处于未启动状态或已启动状态,等待队列中已经有3个线程(A、B和
    C)在等待。此时,线程D执行get()方法将导致线程D也到等待队列中去等待。

    当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从
    队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复A线程的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回。

    Fork/Join

    概念

    Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算

    所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解

    Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率

    Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

    使用

    提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务

    class AddTask3 extends RecursiveTask<Integer> {
    
        int begin;
        int end;
        public AddTask3(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
        @Override
        public String toString() {
            return "{" + begin + "," + end + '}';
        }
        @Override
        protected Integer compute() {
            // 5, 5
            if (begin == end) {
                log.debug("join() {}", begin);
                return begin;
            }
            // 4, 5
            if (end - begin == 1) {
                log.debug("join() {} + {} = {}", begin, end, end + begin);
                return end + begin;
            }
    
            // 1 5
            int mid = (end + begin) / 2; // 3
            AddTask3 t1 = new AddTask3(begin, mid); // 1,3
            t1.fork();
            AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
            t2.fork();
            log.debug("fork() {} + {} = ?", t1, t2);
            int result = t1.join() + t2.join();
            log.debug("join() {} + {} = {}", t1, t2, result);
            return result;
        }
    }
    //结果:
    [ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
    [ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
    [ForkJoinPool-1-worker-0] - join() 3
    [ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
    [ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
    [ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
    [ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15
    15
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    其他问题

    实现Runnable和Callable接口的区别

    Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口 不会返回结果或抛出检查异常,但是 Callable 接口 可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口 ,这样代码看起来会更加简洁。

    工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

    //Runnable.java
    @FunctionalInterface
    public interface Runnable {
       /**
        * 被线程执行,没有返回值也无法抛出异常
        */
        public abstract void run();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    //Callable.java
    @FunctionalInterface
    public interface Callable<V> {
        /**
         * 计算结果,或在无法这样做时抛出异常。
         * @return 计算得出的结果
         * @throws 如果无法计算结果,则抛出异常
         */
        V call() throws Exception;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    执行execute和submit方法的区别

    1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
    2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

    我们以 AbstractExecutorService 接口中的一个 submit 方法为例子来看看源代码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    
    • 1
    • 2
    • 3
    • 4

    我们再来看看execute()方法:没有返回值

    public void execute(Runnable command) {
    //省略
    }
    
    
    • 1
    • 2
    • 3
    • 4

    为什么我们调用start() 方法时会执行 run() 方法,为什么我们不能直接调用 run() 方法?

    这是另一个非常经典的 Java 多线程面试问题,而且在面试中会经常被问到。很简单,但是很多人都会答不上来!

    new 一个 Thread,线程进入了新建状态。调用 start()方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。 start() 会执行线程的相应准备工作,然后自动执行 run() 方法的内容,这是真正的多线程工作。 但是,直接执行 run() 方法,会把 run() 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。

    总结: 调用 start() 方法方可启动线程并使线程进入就绪状态,直接执行 run() 方法的话不会以多线程的方式执行。

  • 相关阅读:
    Maven学习笔记
    js文字逐个显示
    年耗资百万数据库升级记录
    基于SSM+Vue的咖啡销售系统
    【SSM直击大厂】第十三章:MyBatis 详解
    C# StringBuilder 使用
    目标检测——食品饮料数据集
    WISE 2019 | ML-GCN:多标签图节点分类的半监督图嵌入
    【jvm】虚拟机之本地方法接口与本地方法库
    Java 第二阶段提升编程能力【IO流】
  • 原文地址:https://blog.csdn.net/m0_46669446/article/details/126356863