• Java多线程及原理


    工作原理

    线程池内部是通过队列结合线程实现的,当我们利用线程池执行任务时:

    1. 如果此时线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    2. 如果此时线程池中的线程数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。

    3. 如果此时线程池中的线程数量大于等于corePoolSize,缓冲队列workQueue已满,并且线程池中的线程数量小于maximumPoolSize,建新的线程来处理被添加的任务。

    4. 如果此时线裎池中的线数量大于corePoolSize,缓存冲队列workQueue已满, 并且线程池中的数量等于maximumPoolSize,那么过handler所指定的策略来处理此任务。

    5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime, 线将被终止。这样,线程池可以动态的调整池中的线程数。

    相关配置

    corePoolSize:核心线程数

    maximumPoolSize:最大线程数 【包括核心线程数】

    keepAliveTime:生存时间【线程长时间不干活了,归还给操作系统,核心线程不用归还,可以指定是否参与归还过程】

    生存时间单位

    任务队列:等待队列,如果不指定,最大值是Integer.MAX_VALUE【各种各样的BlockingQueue

    线程工厂【默认设置优先级是普通优先级,非守护线程】,最好自定义线程名称,方便回溯

    拒绝策略,包括以下四种:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

    执行流程:先占满核心线程-> 再占满任务队列-> 再占满(最大线程数-核心线程数)-> 最后执行拒绝策略
    一般自定义拒绝策略:将相关信息保存到redis,kafka,日志,MySQL记录 实现RejectedExecutionHandler并重写rejectedExecution方法

    自定义拒绝策略代码示例

    1. package git.snippets.juc;
    2. import java.util.concurrent.*;
    3. /**
    4. * 自定义拒绝策略
    5. */
    6. public class MyRejectedHandler {
    7. public static void main(String[] args) {
    8. ExecutorService service = new ThreadPoolExecutor(4, 4,
    9. 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
    10. Executors.defaultThreadFactory(),
    11. new MyHandler());
    12. }
    13. static class MyHandler implements RejectedExecutionHandler {
    14. @Override
    15. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    16. //log("r rejected")
    17. //save r kafka mysql redis
    18. //try 3 times
    19. if (executor.getQueue().size() < 10000) {
    20. //try put again();
    21. }
    22. }
    23. }
    24. }

    SingleThreadPool

    • 保证线程按顺序执行

    • 为什么要有单线程的线程池?这个主要是用来做任务队列和线程生命周期管理

    • 使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647) 约等于无界。

    示例代码:

    1. package git.snippets.juc;
    2. import java.util.concurrent.ExecutorService;
    3. import java.util.concurrent.Executors;
    4. import static java.util.concurrent.TimeUnit.SECONDS;
    5. public class SingleThreadPoolUsage {
    6. public static void main(String[] args) throws InterruptedException {
    7. ExecutorService service = Executors.newSingleThreadExecutor();
    8. for (int i = 0; i < 10; i++) {
    9. final int j = i;
    10. service.submit(() -> System.out.println("current thread " + Thread.currentThread() + " " + j));
    11. }
    12. service.shutdown();
    13. service.awaitTermination(60, SECONDS);
    14. }
    15. }

    CachedThreadPool

    • corePoolSize:0

    • maxiumPoolSize:Integer.MAX_VALUE(2147483647)

    • keepAliveTime 60秒

    • 使用SynchronousQueue作为任务队列 必须马上执行

    使用示例:

    1. package git.snippets.juc;
    2. import java.util.concurrent.ExecutorService;
    3. import java.util.concurrent.Executors;
    4. import java.util.concurrent.TimeUnit;
    5. public class CachedThreadPoolUsage {
    6. public static void main(String[] args) throws InterruptedException {
    7. System.out.println("cached thread pool usage...");
    8. ExecutorService service = Executors.newCachedThreadPool();
    9. System.out.println(service);
    10. for (int i = 0; i < 2; i++) {
    11. service.execute(() -> {
    12. try {
    13. TimeUnit.MILLISECONDS.sleep(500);
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }
    17. System.out.println(Thread.currentThread().getName());
    18. });
    19. }
    20. System.out.println(service);
    21. TimeUnit.SECONDS.sleep(80);
    22. System.out.println(service);
    23. }
    24. }

    FixedThreadPool

    • 最大线程数等于核心线程数

    • 使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647)

    使用示例见:

    1. package git.snippets.juc;
    2. import java.util.ArrayList;
    3. import java.util.List;
    4. import java.util.concurrent.Callable;
    5. import java.util.concurrent.ExecutionException;
    6. import java.util.concurrent.ExecutorService;
    7. import java.util.concurrent.Executors;
    8. import java.util.concurrent.Future;
    9. import java.util.concurrent.TimeUnit;
    10. /**
    11. * 多线程和单线程计算某个范围内的所有素数
    12. */
    13. public class FixedThreadPoolUsage {
    14. public static void main(String[] args) throws InterruptedException, ExecutionException {
    15. long start = System.currentTimeMillis();
    16. getPrime(1, 200000);
    17. long end = System.currentTimeMillis();
    18. System.out.println("use single thread...cost: " + (end - start));
    19. final int cpuCoreNum = 4;
    20. ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
    21. MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
    22. MyTask t2 = new MyTask(80001, 130000);
    23. MyTask t3 = new MyTask(130001, 170000);
    24. MyTask t4 = new MyTask(170001, 200000);
    25. Future> f1 = service.submit(t1);
    26. Future> f2 = service.submit(t2);
    27. Future> f3 = service.submit(t3);
    28. Future> f4 = service.submit(t4);
    29. System.out.println();
    30. start = System.currentTimeMillis();
    31. f1.get();
    32. f2.get();
    33. f3.get();
    34. f4.get();
    35. end = System.currentTimeMillis();
    36. System.out.println("use fixed thread pool...cost: " + (end - start));
    37. service.shutdown();
    38. service.awaitTermination(1, TimeUnit.MINUTES);
    39. }
    40. static boolean isPrime(int num) {
    41. for (int i = 2; i <= num / 2; i++) {
    42. if (num % i == 0) {
    43. return false;
    44. }
    45. }
    46. return true;
    47. }
    48. static List getPrime(int start, int end) {
    49. List results = new ArrayList<>();
    50. for (int i = start; i <= end; i++) {
    51. if (isPrime(i)) results.add(i);
    52. }
    53. return results;
    54. }
    55. static class MyTask implements Callable> {
    56. int startPos, endPos;
    57. MyTask(int s, int e) {
    58. this.startPos = s;
    59. this.endPos = e;
    60. }
    61. @Override
    62. public List call() {
    63. List r = getPrime(startPos, endPos);
    64. return r;
    65. }
    66. }
    67. }

    ScheduledThreadPool

    使用DelayWorkQueue,包括了如下两个主要方法

    scheduleAtFixedRate()

    当前任务执行时间小于间隔时间,每次到点即执行;

    当前任务执行时间大于等于间隔时间,任务执行后立即执行下一次任务。相当于连续执行了。

    scheduleWithFixedDelay()

    每当上次任务执行完毕后,间隔一段时间执行。不管当前任务执行时间大于、等于还是小于间隔时间,执行效果都是一样的。

    使用示例:

    1. package git.snippets.juc;
    2. import java.util.Date;
    3. import java.util.concurrent.Executors;
    4. import java.util.concurrent.ScheduledExecutorService;
    5. import static java.util.concurrent.TimeUnit.SECONDS;
    6. public class ScheduleThreadPoolUsage {
    7. static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    8. public static void main(String[] args) {
    9. test1();
    10. test2();
    11. test3();
    12. }
    13. /**
    14. * 任务执行时间(8s)小于间隔时间(10s)
    15. */
    16. public static void test1() {
    17. scheduler.scheduleAtFixedRate(() -> {
    18. System.out.println("Start: scheduleAtFixedRate: " + new Date());
    19. try {
    20. Thread.sleep(8000);
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. System.out.println("End : scheduleAtFixedRate: " + new Date());
    25. }, 0, 10, SECONDS);
    26. }
    27. /**
    28. * 任务执行时间(12s)大于间隔时间(10s)
    29. */
    30. public static void test2() {
    31. scheduler.scheduleAtFixedRate(() -> {
    32. System.out.println("Start: scheduleAtFixedRate: " + new Date());
    33. try {
    34. Thread.sleep(12000);
    35. } catch (InterruptedException e) {
    36. e.printStackTrace();
    37. }
    38. System.out.println("End : scheduleAtFixedRate: " + new Date());
    39. }, 0, 10, SECONDS);
    40. }
    41. /**
    42. * 任务执行时间(8s)小于间隔时间(10s)
    43. */
    44. public static void test3() {
    45. scheduler.scheduleWithFixedDelay(() -> {
    46. System.out.println("Start: scheduleWithFixedDelay: " + new Date());
    47. try {
    48. Thread.sleep(12000);
    49. } catch (InterruptedException e) {
    50. e.printStackTrace();
    51. }
    52. System.out.println("End : scheduleWithFixedDelay: " + new Date());
    53. }, 0, 10, SECONDS);
    54. }
    55. }

    ForkJoinPool

    Java SE 1.7 以后新增的线程池,包括以下两个核心类

    第一个是:RecursiveAction

    它是一种没有任何返回值的任务。只是做一些工作,比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者 CPU 执行。
    我们可以通过继承来实现一个RecursiveAction

    第二个是:RecursiveTask

    它是一种会返回结果的任务。可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并。

    使用示例:

    1. package git.snippets.juc;
    2. import java.util.concurrent.ForkJoinPool;
    3. import java.util.concurrent.RecursiveAction;
    4. import java.util.concurrent.RecursiveTask;
    5. import java.util.concurrent.TimeUnit;
    6. import java.util.stream.LongStream;
    7. public class ForkJoinPoolUsage implements Calculator {
    8. private ForkJoinPool pool;
    9. public ForkJoinPoolUsage() {
    10. // 也可以使用公用的 ForkJoinPool:
    11. // pool = ForkJoinPool.commonPool()
    12. pool = new ForkJoinPool();
    13. }
    14. public static void useRecursiveAction() throws InterruptedException {
    15. // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
    16. ForkJoinPool forkJoinPool = new ForkJoinPool();
    17. // 提交可分解的PrintTask任务
    18. forkJoinPool.submit(new MyRecursiveAction(0, 1000));
    19. while (!forkJoinPool.isTerminated()) {
    20. forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
    21. }
    22. // 关闭线程池
    23. forkJoinPool.shutdown();
    24. }
    25. public static void useRecursiveTask() {
    26. long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
    27. Calculator calculator = new ForkJoinPoolUsage();
    28. System.out.println(calculator.sumUp(numbers)); // 打印结果500500
    29. }
    30. public static void main(String[] args) throws InterruptedException {
    31. useRecursiveTask();
    32. useRecursiveAction();
    33. }
    34. @Override
    35. public long sumUp(long[] numbers) {
    36. return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
    37. }
    38. private static class MyRecursiveAction extends RecursiveAction {
    39. /**
    40. * 每个"小任务"最多只打印20个数
    41. */
    42. private static final int MAX = 20;
    43. private int start;
    44. private int end;
    45. public MyRecursiveAction(int start, int end) {
    46. this.start = start;
    47. this.end = end;
    48. }
    49. @Override
    50. protected void compute() {
    51. //当end-start的值小于MAX时,开始打印
    52. if ((end - start) < MAX) {
    53. for (int i = start; i < end; i++) {
    54. System.out.println(Thread.currentThread().getName() + "-i的值" + i);
    55. }
    56. } else {
    57. // 将大任务分解成两个小任务
    58. int middle = (start + end) / 2;
    59. MyRecursiveAction left = new MyRecursiveAction(start, middle);
    60. MyRecursiveAction right = new MyRecursiveAction(middle, end);
    61. left.fork();
    62. right.fork();
    63. }
    64. }
    65. }
    66. private static class SumTask extends RecursiveTask {
    67. private long[] numbers;
    68. private int from;
    69. private int to;
    70. public SumTask(long[] numbers, int from, int to) {
    71. this.numbers = numbers;
    72. this.from = from;
    73. this.to = to;
    74. }
    75. @Override
    76. protected Long compute() {
    77. // 当需要计算的数字小于6时,直接计算结果
    78. if (to - from < 6) {
    79. long total = 0;
    80. for (int i = from; i <= to; i++) {
    81. total += numbers[i];
    82. }
    83. return total;
    84. // 否则,把任务一分为二,递归计算
    85. } else {
    86. int middle = (from + to) / 2;
    87. SumTask taskLeft = new SumTask(numbers, from, middle);
    88. SumTask taskRight = new SumTask(numbers, middle + 1, to);
    89. taskLeft.fork();
    90. taskRight.fork();
    91. return taskLeft.join() + taskRight.join();
    92. }
    93. }
    94. }
    95. }
    96. interface Calculator {
    97. long sumUp(long[] numbers);
    98. }

    WorkStealingPool

    每个线程都有单独的队列,每个线程队列执行完毕后,就会去其他的线程队列里面拿过来执行, 底层是ForkJoinPool

    • Java SE 1.8 新增

    • 会自动启动 CPU 核数个线程去执行任务

    使用示例:

    1. /**
    2. *
    3. */
    4. package git.snippets.juc;
    5. import java.io.IOException;
    6. import java.util.concurrent.ExecutorService;
    7. import java.util.concurrent.Executors;
    8. import java.util.concurrent.TimeUnit;
    9. /**
    10. * @since 1.8
    11. */
    12. public class WorkStealingPoolUsage {
    13. public static void main(String[] args) throws IOException {
    14. int core = Runtime.getRuntime().availableProcessors();
    15. // 会自动启动cpu核数个线程去执行任务 ,其中第一个是1s执行完毕,其余都是2s执行完毕,
    16. // 有一个任务会进行等待,当第一个执行完毕后,会再次偷取最后一个任务执行
    17. ExecutorService service = Executors.newWorkStealingPool();
    18. service.execute(new R(1000));
    19. for (int i = 0; i < core; i++) {
    20. service.execute(new R(2000));
    21. }
    22. //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
    23. System.in.read();
    24. }
    25. static class R implements Runnable {
    26. int time;
    27. R(int t) {
    28. this.time = t;
    29. }
    30. @Override
    31. public void run() {
    32. try {
    33. TimeUnit.MILLISECONDS.sleep(time);
    34. } catch (InterruptedException e) {
    35. e.printStackTrace();
    36. }
    37. System.out.println(time + " " + Thread.currentThread().getName());
    38. }
    39. }
    40. }

    CompletableFuture

    • Java SE 1.8 新增

    • anyOf()可以实现“任意个 CompletableFuture 只要一个成功”,allOf()可以实现“所有 CompletableFuture 都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

    使用示例:

    1. package git.snippets.juc;
    2. import java.util.Random;
    3. import java.util.concurrent.CompletableFuture;
    4. import java.util.concurrent.ExecutionException;
    5. import java.util.concurrent.TimeUnit;
    6. /**
    7. * 假设你能够提供一个服务
    8. * 这个服务查询各大电商网站同一类产品的价格并汇总展示
    9. */
    10. public class CompletableFutureUsage {
    11. public static void main(String[] args) throws ExecutionException, InterruptedException {
    12. way1();
    13. way2();
    14. }
    15. public static void way1() {
    16. long start = System.currentTimeMillis();
    17. System.out.println("p1 " + priceOfJD());
    18. System.out.println("p2 " + priceOfTB());
    19. System.out.println("p3 " + priceOfTM());
    20. long end = System.currentTimeMillis();
    21. System.out.println("串行执行,耗时(ms):" + (end - start));
    22. }
    23. public static void way2() throws ExecutionException, InterruptedException {
    24. long start = System.currentTimeMillis();
    25. CompletableFuture p1 = CompletableFuture.supplyAsync(() -> priceOfJD());
    26. CompletableFuture p2 = CompletableFuture.supplyAsync(() -> priceOfTB());
    27. CompletableFuture p3 = CompletableFuture.supplyAsync(() -> priceOfTM());
    28. CompletableFuture.allOf(p1, p2, p3).join();
    29. System.out.println("p1 " + p1.get());
    30. System.out.println("p2 " + p2.get());
    31. System.out.println("p3 " + p3.get());
    32. long end = System.currentTimeMillis();
    33. System.out.println("使用CompletableFuture并行执行,耗时(ms): " + (end - start));
    34. }
    35. private static double priceOfTM() {
    36. delay();
    37. return 1.00;
    38. }
    39. private static double priceOfTB() {
    40. delay();
    41. return 2.00;
    42. }
    43. private static double priceOfJD() {
    44. delay();
    45. return 3.00;
    46. }
    47. private static void delay() {
    48. int time = new Random().nextInt(500);
    49. try {
    50. TimeUnit.MILLISECONDS.sleep(time);
    51. } catch (InterruptedException e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. }

    证明原子操作类比synchronized更高效

    1. package git.snippets.juc;
    2. import java.util.ArrayList;
    3. import java.util.List;
    4. import java.util.concurrent.atomic.AtomicInteger;
    5. /**
    6. * 证明原子操作类比synchronized更高效
    7. *
    8. * @author Grey
    9. * @date 2021/4/26
    10. */
    11. public class AtomVSSync {
    12. public static void main(String[] args) {
    13. test1();
    14. }
    15. AtomicInteger atomicCount = new AtomicInteger(0);
    16. int count = 0;
    17. final static int TIMES = 80000000;
    18. void m() {
    19. for (int i = 0; i < TIMES; i++) {
    20. atomicCount.incrementAndGet(); //原子操作
    21. }
    22. }
    23. void m2() {
    24. for (int i = 0; i < TIMES; i++) {
    25. synchronized (this) {
    26. count++;
    27. }
    28. }
    29. }
    30. public static void test1() {
    31. AtomVSSync t1 = new AtomVSSync();
    32. AtomVSSync t2 = new AtomVSSync();
    33. long time1 = time(t1::m);
    34. System.out.println("使用原子类得到的结果是:" + t1.atomicCount);
    35. long time2 = time(t2::m2);
    36. System.out.println("使用synchronized得到的结果是:" + t2.count);
    37. System.out.println("使用原子类花费的时间是:" + time1);
    38. System.out.println("使用 synchronized 花费的时间是 :" + time2);
    39. }
    40. private static long time(Runnable runnable) {
    41. List threads = new ArrayList<>();
    42. long startTime = System.currentTimeMillis();
    43. for (int i = 0; i < 10; i++) {
    44. threads.add(new Thread(runnable, "thread-" + i));
    45. }
    46. threads.forEach(Thread::start);
    47. threads.forEach(o -> {
    48. try {
    49. o.join();
    50. } catch (InterruptedException e) {
    51. e.printStackTrace();
    52. }
    53. });
    54. long endTime = System.currentTimeMillis();
    55. return endTime - startTime;
    56. }
    57. }

  • 相关阅读:
    接口自动化Requests+Pytest基础实现
    EasyX趣味化编程note6,图片操作及文字
    Idea创建springboot项目
    使用 Promise.withResolvers() 来简化你将函数 Promise 化的实现~~
    2023-10-17 mysql-innodb-解析write_row的record的一行数据-分析
    flask——数据库连接池、wtfroms、信号、多app应用、flask-script
    里程碑,用自己的编程语言实现了一个网站
    python datetime模块
    C/C++语言的服务器LS调研 (Language Server 实现代码索引 跳转定义 智能提示等功能)
    Spring面试题25:Spring如何控制bean加载先后顺序
  • 原文地址:https://blog.csdn.net/chen978616649/article/details/126812828