线程池内部是通过队列结合线程实现的,当我们利用线程池执行任务时:
如果此时线程池中的线程数量小于corePoolSize
,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的线程数量等于corePoolSize
,但是缓冲队列workQueue
未满,那么任务被放入缓冲队列。
如果此时线程池中的线程数量大于等于corePoolSize
,缓冲队列workQueue
已满,并且线程池中的线程数量小于maximumPoolSize
,建新的线程来处理被添加的任务。
如果此时线裎池中的线数量大于corePoolSize
,缓存冲队列workQueue
已满, 并且线程池中的数量等于maximumPoolSize
,那么过handler
所指定的策略来处理此任务。
当线程池中的线程数量大于corePoolSize
时,如果某线程空闲时间超过keepAliveTime
, 线将被终止。这样,线程池可以动态的调整池中的线程数。
corePoolSize
:核心线程数
maximumPoolSize
:最大线程数 【包括核心线程数】
keepAliveTime
:生存时间【线程长时间不干活了,归还给操作系统,核心线程不用归还,可以指定是否参与归还过程】
生存时间单位
任务队列:等待队列,如果不指定,最大值是Integer.MAX_VALUE
【各种各样的BlockingQueue
】
线程工厂【默认设置优先级是普通优先级,非守护线程】,最好自定义线程名称,方便回溯
拒绝策略,包括以下四种:
ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException
异常。
ThreadPoolExecutor.DiscardPolicy
:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy
:由调用线程(提交任务的线程)处理该任务
执行流程:先占满核心线程-> 再占满任务队列-> 再占满(最大线程数-核心线程数)-> 最后执行拒绝策略
一般自定义拒绝策略:将相关信息保存到redis,kafka,日志,MySQL记录 实现RejectedExecutionHandler并重写rejectedExecution方法
自定义拒绝策略代码示例
- package git.snippets.juc;
-
- import java.util.concurrent.*;
-
- /**
- * 自定义拒绝策略
- */
- public class MyRejectedHandler {
- public static void main(String[] args) {
- ExecutorService service = new ThreadPoolExecutor(4, 4,
- 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
- Executors.defaultThreadFactory(),
- new MyHandler());
- }
-
- static class MyHandler implements RejectedExecutionHandler {
-
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- //log("r rejected")
- //save r kafka mysql redis
- //try 3 times
- if (executor.getQueue().size() < 10000) {
- //try put again();
- }
- }
- }
- }
-
保证线程按顺序执行
为什么要有单线程的线程池?这个主要是用来做任务队列和线程生命周期管理
使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647) 约等于无界。
示例代码:
- package git.snippets.juc;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- import static java.util.concurrent.TimeUnit.SECONDS;
-
- public class SingleThreadPoolUsage {
- public static void main(String[] args) throws InterruptedException {
- ExecutorService service = Executors.newSingleThreadExecutor();
- for (int i = 0; i < 10; i++) {
- final int j = i;
- service.submit(() -> System.out.println("current thread " + Thread.currentThread() + " " + j));
- }
- service.shutdown();
- service.awaitTermination(60, SECONDS);
- }
- }
-
corePoolSize:0
maxiumPoolSize:Integer.MAX_VALUE(2147483647)
keepAliveTime 60秒
使用SynchronousQueue
作为任务队列 必须马上执行
使用示例:
- package git.snippets.juc;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- public class CachedThreadPoolUsage {
- public static void main(String[] args) throws InterruptedException {
- System.out.println("cached thread pool usage...");
- ExecutorService service = Executors.newCachedThreadPool();
- System.out.println(service);
- for (int i = 0; i < 2; i++) {
- service.execute(() -> {
- try {
- TimeUnit.MILLISECONDS.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName());
- });
- }
- System.out.println(service);
- TimeUnit.SECONDS.sleep(80);
- System.out.println(service);
- }
- }
-
最大线程数等于核心线程数
使用LinkedBlockingQueue
作为任务队列,上界为:Integer.MAX_VALUE(2147483647)
使用示例见:
- package git.snippets.juc;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 多线程和单线程计算某个范围内的所有素数
- */
- public class FixedThreadPoolUsage {
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- long start = System.currentTimeMillis();
- getPrime(1, 200000);
- long end = System.currentTimeMillis();
- System.out.println("use single thread...cost: " + (end - start));
-
- final int cpuCoreNum = 4;
-
- ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
-
- MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
- MyTask t2 = new MyTask(80001, 130000);
- MyTask t3 = new MyTask(130001, 170000);
- MyTask t4 = new MyTask(170001, 200000);
-
- Future
> f1 = service.submit(t1);
- Future
> f2 = service.submit(t2);
- Future
> f3 = service.submit(t3);
- Future
> f4 = service.submit(t4);
- System.out.println();
- start = System.currentTimeMillis();
- f1.get();
- f2.get();
- f3.get();
- f4.get();
- end = System.currentTimeMillis();
- System.out.println("use fixed thread pool...cost: " + (end - start));
- service.shutdown();
- service.awaitTermination(1, TimeUnit.MINUTES);
- }
-
- static boolean isPrime(int num) {
- for (int i = 2; i <= num / 2; i++) {
- if (num % i == 0) {
- return false;
- }
- }
- return true;
- }
-
- static List
getPrime(int start, int end) { - List
results = new ArrayList<>(); - for (int i = start; i <= end; i++) {
- if (isPrime(i)) results.add(i);
- }
-
- return results;
- }
-
- static class MyTask implements Callable
> {
- int startPos, endPos;
-
- MyTask(int s, int e) {
- this.startPos = s;
- this.endPos = e;
- }
-
- @Override
- public List
call() { - List
r = getPrime(startPos, endPos); - return r;
- }
-
- }
- }
使用DelayWorkQueue
,包括了如下两个主要方法
scheduleAtFixedRate()
当前任务执行时间小于间隔时间,每次到点即执行;
当前任务执行时间大于等于间隔时间,任务执行后立即执行下一次任务。相当于连续执行了。
scheduleWithFixedDelay()
每当上次任务执行完毕后,间隔一段时间执行。不管当前任务执行时间大于、等于还是小于间隔时间,执行效果都是一样的。
使用示例:
- package git.snippets.juc;
-
- import java.util.Date;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
-
- import static java.util.concurrent.TimeUnit.SECONDS;
-
- public class ScheduleThreadPoolUsage {
- static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
-
- public static void main(String[] args) {
- test1();
- test2();
- test3();
- }
-
- /**
- * 任务执行时间(8s)小于间隔时间(10s)
- */
- public static void test1() {
- scheduler.scheduleAtFixedRate(() -> {
- System.out.println("Start: scheduleAtFixedRate: " + new Date());
- try {
- Thread.sleep(8000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("End : scheduleAtFixedRate: " + new Date());
- }, 0, 10, SECONDS);
- }
-
- /**
- * 任务执行时间(12s)大于间隔时间(10s)
- */
- public static void test2() {
- scheduler.scheduleAtFixedRate(() -> {
- System.out.println("Start: scheduleAtFixedRate: " + new Date());
- try {
- Thread.sleep(12000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("End : scheduleAtFixedRate: " + new Date());
- }, 0, 10, SECONDS);
- }
-
- /**
- * 任务执行时间(8s)小于间隔时间(10s)
- */
- public static void test3() {
- scheduler.scheduleWithFixedDelay(() -> {
- System.out.println("Start: scheduleWithFixedDelay: " + new Date());
- try {
- Thread.sleep(12000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("End : scheduleWithFixedDelay: " + new Date());
- }, 0, 10, SECONDS);
- }
- }
-
Java SE 1.7 以后新增的线程池,包括以下两个核心类
第一个是:RecursiveAction
它是一种没有任何返回值的任务。只是做一些工作,比如写数据到磁盘,然后就退出了。 一个RecursiveAction
可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者 CPU 执行。
我们可以通过继承来实现一个RecursiveAction
。
第二个是:RecursiveTask
它是一种会返回结果的任务。可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并。
使用示例:
- package git.snippets.juc;
-
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.RecursiveAction;
- import java.util.concurrent.RecursiveTask;
- import java.util.concurrent.TimeUnit;
- import java.util.stream.LongStream;
-
-
- public class ForkJoinPoolUsage implements Calculator {
- private ForkJoinPool pool;
-
- public ForkJoinPoolUsage() {
- // 也可以使用公用的 ForkJoinPool:
- // pool = ForkJoinPool.commonPool()
- pool = new ForkJoinPool();
- }
-
- public static void useRecursiveAction() throws InterruptedException {
- // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- // 提交可分解的PrintTask任务
- forkJoinPool.submit(new MyRecursiveAction(0, 1000));
-
- while (!forkJoinPool.isTerminated()) {
- forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
- }
- // 关闭线程池
- forkJoinPool.shutdown();
- }
-
- public static void useRecursiveTask() {
- long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
- Calculator calculator = new ForkJoinPoolUsage();
- System.out.println(calculator.sumUp(numbers)); // 打印结果500500
- }
-
- public static void main(String[] args) throws InterruptedException {
- useRecursiveTask();
- useRecursiveAction();
- }
-
- @Override
- public long sumUp(long[] numbers) {
- return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
- }
-
- private static class MyRecursiveAction extends RecursiveAction {
-
- /**
- * 每个"小任务"最多只打印20个数
- */
- private static final int MAX = 20;
-
- private int start;
- private int end;
-
- public MyRecursiveAction(int start, int end) {
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected void compute() {
- //当end-start的值小于MAX时,开始打印
- if ((end - start) < MAX) {
- for (int i = start; i < end; i++) {
- System.out.println(Thread.currentThread().getName() + "-i的值" + i);
- }
- } else {
- // 将大任务分解成两个小任务
- int middle = (start + end) / 2;
- MyRecursiveAction left = new MyRecursiveAction(start, middle);
- MyRecursiveAction right = new MyRecursiveAction(middle, end);
- left.fork();
- right.fork();
- }
- }
-
-
- }
-
- private static class SumTask extends RecursiveTask
{ - private long[] numbers;
- private int from;
- private int to;
-
- public SumTask(long[] numbers, int from, int to) {
- this.numbers = numbers;
- this.from = from;
- this.to = to;
- }
-
-
- @Override
- protected Long compute() {
-
- // 当需要计算的数字小于6时,直接计算结果
- if (to - from < 6) {
- long total = 0;
- for (int i = from; i <= to; i++) {
- total += numbers[i];
- }
- return total;
- // 否则,把任务一分为二,递归计算
- } else {
- int middle = (from + to) / 2;
- SumTask taskLeft = new SumTask(numbers, from, middle);
- SumTask taskRight = new SumTask(numbers, middle + 1, to);
- taskLeft.fork();
- taskRight.fork();
- return taskLeft.join() + taskRight.join();
- }
- }
- }
- }
-
- interface Calculator {
- long sumUp(long[] numbers);
- }
每个线程都有单独的队列,每个线程队列执行完毕后,就会去其他的线程队列里面拿过来执行, 底层是
ForkJoinPool
Java SE 1.8 新增
会自动启动 CPU 核数个线程去执行任务
使用示例:
- /**
- *
- */
- package git.snippets.juc;
-
- import java.io.IOException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @since 1.8
- */
- public class WorkStealingPoolUsage {
- public static void main(String[] args) throws IOException {
- int core = Runtime.getRuntime().availableProcessors();
- // 会自动启动cpu核数个线程去执行任务 ,其中第一个是1s执行完毕,其余都是2s执行完毕,
- // 有一个任务会进行等待,当第一个执行完毕后,会再次偷取最后一个任务执行
- ExecutorService service = Executors.newWorkStealingPool();
- service.execute(new R(1000));
- for (int i = 0; i < core; i++) {
- service.execute(new R(2000));
- }
- //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
- System.in.read();
- }
-
- static class R implements Runnable {
-
- int time;
-
- R(int t) {
- this.time = t;
- }
-
- @Override
- public void run() {
- try {
- TimeUnit.MILLISECONDS.sleep(time);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- System.out.println(time + " " + Thread.currentThread().getName());
- }
- }
- }
-
Java SE 1.8 新增
anyOf()
可以实现“任意个 CompletableFuture 只要一个成功”,allOf()
可以实现“所有 CompletableFuture 都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
使用示例:
- package git.snippets.juc;
-
- import java.util.Random;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 假设你能够提供一个服务
- * 这个服务查询各大电商网站同一类产品的价格并汇总展示
- */
- public class CompletableFutureUsage {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- way1();
- way2();
- }
-
- public static void way1() {
- long start = System.currentTimeMillis();
- System.out.println("p1 " + priceOfJD());
- System.out.println("p2 " + priceOfTB());
- System.out.println("p3 " + priceOfTM());
- long end = System.currentTimeMillis();
- System.out.println("串行执行,耗时(ms):" + (end - start));
- }
-
- public static void way2() throws ExecutionException, InterruptedException {
- long start = System.currentTimeMillis();
- CompletableFuture
p1 = CompletableFuture.supplyAsync(() -> priceOfJD()); - CompletableFuture
p2 = CompletableFuture.supplyAsync(() -> priceOfTB()); - CompletableFuture
p3 = CompletableFuture.supplyAsync(() -> priceOfTM()); - CompletableFuture.allOf(p1, p2, p3).join();
- System.out.println("p1 " + p1.get());
- System.out.println("p2 " + p2.get());
- System.out.println("p3 " + p3.get());
- long end = System.currentTimeMillis();
- System.out.println("使用CompletableFuture并行执行,耗时(ms): " + (end - start));
- }
-
- private static double priceOfTM() {
- delay();
- return 1.00;
- }
-
- private static double priceOfTB() {
- delay();
- return 2.00;
- }
-
- private static double priceOfJD() {
- delay();
- return 3.00;
- }
-
- private static void delay() {
- int time = new Random().nextInt(500);
- try {
- TimeUnit.MILLISECONDS.sleep(time);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- package git.snippets.juc;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * 证明原子操作类比synchronized更高效
- *
- * @author Grey
- * @date 2021/4/26
- */
- public class AtomVSSync {
- public static void main(String[] args) {
- test1();
- }
-
- AtomicInteger atomicCount = new AtomicInteger(0);
- int count = 0;
- final static int TIMES = 80000000;
-
- void m() {
- for (int i = 0; i < TIMES; i++) {
- atomicCount.incrementAndGet(); //原子操作
- }
- }
-
- void m2() {
- for (int i = 0; i < TIMES; i++) {
- synchronized (this) {
- count++;
- }
- }
- }
-
-
- public static void test1() {
- AtomVSSync t1 = new AtomVSSync();
- AtomVSSync t2 = new AtomVSSync();
- long time1 = time(t1::m);
- System.out.println("使用原子类得到的结果是:" + t1.atomicCount);
- long time2 = time(t2::m2);
- System.out.println("使用synchronized得到的结果是:" + t2.count);
-
- System.out.println("使用原子类花费的时间是:" + time1);
- System.out.println("使用 synchronized 花费的时间是 :" + time2);
- }
-
- private static long time(Runnable runnable) {
- List
threads = new ArrayList<>(); - long startTime = System.currentTimeMillis();
- for (int i = 0; i < 10; i++) {
- threads.add(new Thread(runnable, "thread-" + i));
- }
- threads.forEach(Thread::start);
- threads.forEach(o -> {
- try {
- o.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- long endTime = System.currentTimeMillis();
- return endTime - startTime;
- }
- }
-