• 【JAVA基础】多线程与线程池


    多线程与线程池

    1. 相关概念

    1.1 线程调度

    • 分时调度:所有线程轮流使用 CPU 的使用权,并且平均分配每个线程占用 CPU 的时间。
    • 抢占式调度:让优先级高的线程以较大的概率优先使用 CPU。如果线程的优先级相同,那么会随机选择一个(线程随机性),Java 使用的为抢占式调度。

    每个线程都有一定的优先级,同优先级线程组成先进先出队列(先到先服务),使用分时调度策略。优先级高的线程采用抢占式策略,获得较多的执行
    机会。每个线程默认的优先级都与创建它的父线程具有相同的优先级。:

    • Thread 类的三个优先级常量:

      • MAX_PRIORITY(10):最高优先级
      • MIN _PRIORITY (1):最低优先级
      • NORM_PRIORITY (5):普通优先级,默认情况下 main 线程具有普通优先级

    1.2 守护线程

    有一种线程,它是在后台运行的,它的任务是为其他线程提供服务的,这种线程被称为“守护线程”。JVM 的垃圾回收线程就是典型的守护线程。守护线程有个特点,就是如果所有非守护线程都死亡,那么守护线程自动死亡。形象理解:兔死狗烹,鸟尽弓藏

    2. 生命周期

    在 java.lang.Thread.State 的枚举类中这样定义:

    public enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • NEW(新建):线程刚被创建,但是并未启动。还没调用 start 方法。

    • RUNNABLE(可运行):这里没有区分就绪和运行状态。因为对于 Java 对象来说,只能标记为可运行,至于什么时候运行,不是 JVM 来控制的了,是 OS 来进行调度的,而且时间非常短暂,因此对于 Java 对象的状态来说,无法区分。

    • BLOCKED(锁阻塞):在 API 中的介绍为:一个正在阻塞、等待一个监视器锁(锁对象)的线程处于这一状态。只有获得锁对象的线程才能有执行机会。

      比如,线程 A 与线程 B 代码中使用同一锁,如果线程 A 获取到锁,线程 A 进入到 Runnable 状态,那么线程 B 就进入到 Blocked锁阻塞状态。

    • WAITING(无限等待):在 API 中介绍为:一个正在无限期等待另一个线程执行一个特别的(唤醒)动作的线程处于这一状态。

      当前线程执行过程中遇到遇到 Object 类的 wait,Thread 类的join,LockSupport 类的 park 方法,并且在调用这些方法时,没有指定时间,那么当前线程会进入 WAITING 状态,直到被唤醒。

    • TIMED_WAITING(计时等待):在 API 中的介绍为:一个正在限时等待另一个线程执行一个(唤醒)动作的线程处于这一状态。

      当前线程执行过程中遇到 Thread 类的 sleep 或 join,Object 类的 wait,LockSupport 类的 park 方法,并且在调用这些方法时,设置了时间,那么当前线程会进入 TIMED_WAITING,直到时间到,或被中断。

    • TERMINATED(被终止):表明此线程已经结束生命周期,终止运行。

    说明:当从 WAITING 或 TIMED_WAITING 恢复到 Runnable 状态时,如果发现当前线程没有得到监视器锁,那么会立刻转入 BLOCKED 状态。

    在这里插入图片描述

    3. 同步机制/同步锁

    3.1 synchronized

    同步锁对象可以是任意类型,但是必须保证竞争“同一个共享资源”的多个线程必须使用同一个“同步锁对象”。

    3.2 lock

    • 保证线程的安全。与采用 synchronized 相比,Lock 可提供多种锁方案,更灵活、更强大。Lock 通过显式定义同步锁对象来实现同步。同步锁使用Lock 对象充当。

    • java.util.concurrent.locks.Lock 接口是控制多个线程对共享资源进行访问的工具。锁提供了对共享资源的独占访问,每次只能有一个线程对 Lock 对象加锁,线程开始访问共享资源之前应先获得 Lock 对象。

    • 在实现线程安全的控制中,比较常用的是 ReentrantLock,可以显式加锁、释放锁。

      ReentrantLock 类实现了 Lock 接口,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。

    3.3 synchronized 与 Lock 的对比

    1. Lock 是显式锁(手动开启和关闭锁,别忘记关闭锁),synchronized 是隐式锁,出了作用域、遇到异常等自动解锁

    2. Lock 只有代码块锁,synchronized 有代码块锁和方法锁

    3. 使用 Lock 锁,JVM 将花费较少的时间来调度线程,性能更好。并且具有更好的扩展性(提供更多的子类,读写锁等),更体现面向对象。

    说明:开发建议中处理线程安全问题优先使用顺序为:Lock ----> 同步代码块 ----> 同步方法

    4. 死锁

    同步机制带来的死锁问题:不同的线程分别占用对方需要的同步资源不放弃,都在等待对方放弃自己需要的同步资源,就形成了线程的死锁。

    • 互斥条件:进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。
    • 请求和保持条件:当进程因请求资源而阻塞时,对已获得的资源保持不放。
    • 不剥夺条件:进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
    • 循环等待:存在一个进程或线程的资源申请序列,使得每个进程或线程都在等待下一个进程或线程所持有的资源。

    解决死锁:死锁一旦出现,基本很难人为干预,只能尽量规避。可以考虑打破上面的诱发条件。

    • 针对互斥条件:互斥条件基本上无法被破坏。因为线程需要通过互斥解决安全问题。
    • 针对请求和保持条件:可以考虑一次性申请所有所需的资源,这样就不存在等待的问题。
    • 针对不剥夺条件:占用部分资源的线程在进一步申请其他资源时,如果申请不到,就主动释放掉已经占用的资源。
    • 针对循环等待:可以将资源改为线性顺序。申请资源时,先申请序号较小的,这样避免循环等待问题。

    5. 线程通信

    5.1 线程间的通信

    当我们需要多个线程来共同完成一件任务,并且我们希望他们有规律的执行,那么多线程之间需要一些通信机制,可以协调它们的工作,以此实现多线程共同操作一份数据。

    比如:线程 A 用来生产包子的,线程 B 用来吃包子的,包子可以理解为同一资源,线程 A 与线程 B 处理的动作,一个是生产,一个是消费,此时 B 线程必须等到 A 线程完成后才能执行,那么线程 A 与线程 B 之间就需要线程通信,即—— 等待唤醒机制。

    5.2 等待唤醒机制

    这是多个线程间的一种协作机制。谈到线程我们经常想到的是线程间的竞争(race),比如去争夺锁,但这并不是故事的全部,线程间也会有协作机制。在一个线程满足某个条件时,就进入等待状态(wait() / wait(time)), 等待其他线程执行完他们的指定代码过后再将其唤醒(notify());或可以指定wait 的时间,等时间到了自动唤醒;在有多个线程进行等待时,如果需要,可以使用 notifyAll()来唤醒所有的等待线程。wait/notify 就是线程间的一种协作机制。

    1. wait:线程不再活动,不再参与调度,进入 wait set 中,因此不会浪费 CPU 资源,也不会去竞争锁了,这时的线程状态是 WAITING 或 TIMED_WAITING。它
      还要等着别的线程执行一个特别的动作,也即“通知(notify)”或者等待时间到,在这个对象上等待的线程从 wait set 中释放出来,重新进入到调度队(ready queue)中
    2. notify:则选取所通知对象的 wait set 中的一个线程释放;
    3. notifyAll:则释放所通知对象的 wait set 上的全部线程。

    注意:被通知的线程被唤醒后也不一定能立即恢复执行,因为它当初中断的地方是在同步块内,而此刻它已经不持有锁,所以它需要再次尝试去获取锁(很可能面临其它线程的竞争),成功后才能在当初调用 wait 方法之后的地方恢复执行。

    • 如果能获取锁,线程就从 WAITING 状态变成 RUNNABLE(可运行)状态;
    • 否则,线程就从 WAITING 状态又变成 BLOCKED(等待锁) 状态

    注意:在JUC中将会学到更多有关等待唤醒机制的类与方法,建议使用JUC中学习到的

    5.3 举例

    例题:使用两个线程打印 1-100。线程 1, 线程 2 交替打印

    class Communication implements Runnable {
    	int i = 1;
    	public void run() {
    		while (true) {
    			synchronized (this) {
    				notify();
    				if (i <= 100) {
    					System.out.println(Thread.currentThread().getName() + ":" + i++);
    				} else break;
    				try {
    					wait();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    5.4 调用 wait 和 notify 需注意的细节

    1. wait 方法与 notify 方法必须要由同一个锁对象调用。因为:对应的锁对象可以通过 notify 唤醒使用同一个锁对象调用的 wait 方法后的线程。
    2. wait 方法与 notify 方法是属于 Object 类的方法的。因为:锁对象可以是任意对象,而任意对象的所属类都是继承了 Object 类的。
    3. wait 方法与 notify 方法必须要在同步代码块或者是同步函数中使用。因为:必须要通过锁对象调用这 2 个方法。否则会报 java.lang.IllegalMonitorStateException 异常。

    5.5 生产者消费者问题

    等待唤醒机制可以解决经典的“生产者与消费者”的问题。生产者与消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个(多个)共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

    生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

    举例:

    生产者(Productor)将产品交给店员(Clerk),而消费者(Customer)从店员处取走产品,店员一次只能持有固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员会叫生产者停一下,如果店中有空位放产品了再通知生产者继续生产;如果店中没有产品了,店员会告诉消费者等一下,如果店中有产品了再通知消费者来取走产品。

    生产者与消费者问题中其实隐含了两个问题:

    • 线程安全问题:因为生产者与消费者共享数据缓冲区,产生安全问题。不过这个问题可以使用同步解决。
    • 线程的协调工作问题:要解决该问题,就必须让生产者线程在缓冲区满时等待(wait),暂停进入阻塞状态,等到下次消费者消耗了缓冲区中的数据的时候,通知(notify)正在等待的线程恢复到就绪状态,重新开始往缓冲区添加数据。同样,也可以让消费者线程在缓冲区空时进入等待(wait),暂停进入阻塞状态,等到生产者往缓冲区添加数据之后,再通知(notify)正在等待的线程恢复到就绪状态。通过这样的通信机制来解决此类问题。

    代码实现:

    public class ConsumerProducerTest {
        public static void main(String[] args) {
            Clerk clerk = new Clerk();
            Producer p1 = new Producer(clerk);
            Consumer c1 = new Consumer(clerk);
            Consumer c2 = new Consumer(clerk);
            p1.setName("生产者 1");
            c1.setName("消费者 1");
            c2.setName("消费者 2");
            p1.start();
            c1.start();
            c2.start();}
    }
    
    //生产者
    class Producer extends Thread{
        private Clerk clerk;
        public Producer(Clerk clerk){
            this.clerk = clerk;
        }
        @Override
        public void run() {
            System.out.println("=========生产者开始生产产品========");
            while(true){
                try {
                    Thread.sleep(40);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //要求 clerk 去增加产品
                clerk.addProduct();
            }
        }
    }
    
    //消费者
    class Consumer extends Thread{
        private Clerk clerk;
        public Consumer(Clerk clerk){
            this.clerk = clerk;
        }
        @Override
        public void run() {
            System.out.println("=========消费者开始消费产品========");
            while(true){
                try {
                    Thread.sleep(90);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                //要求 clerk 去减少产品
                clerk.minusProduct();
            }
        }
    }
    
    //资源类,缓冲区
    class Clerk {
        private int productNum = 0;//产品数量
        private static final int MAX_PRODUCT = 20;
        private static final int MIN_PRODUCT = 1;
        //增加产品
        public synchronized void addProduct() {
            if(productNum < MAX_PRODUCT){
                productNum++;
                System.out.println(Thread.currentThread().getName() +
                        "生产了第" + productNum + "个产品");
                //唤醒消费者
                this.notifyAll();
            }else{
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        //减少产品
        public synchronized void minusProduct() {
            if(productNum >= MIN_PRODUCT){
                System.out.println(Thread.currentThread().getName() +
                        "消费了第" + productNum + "个产品");
                productNum--;
                //唤醒生产者
                this.notifyAll();
            }else{try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    6. 线程池

    如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要的代价较高。

    思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具。

    注意:线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

    引用阿里《Java开发手册》中的一段描述:

    【强制】线程池不允许使用Executors创建,建议通过ThreadPoolExecutor的方式创建,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    说明:Executors返回的线程池对象的弊端如下:

    1.FixedThreadPool和SingleThreadPool:

    允许的请求队列长度为Integet.MAX_VALUE,可能会堆积大量的请求从而导致OOM;

    2.CachedThreadPool:

    允许创建线程数量为Integet.MAX_VALUE,可能会创建大量的线程,从而导致OOM.

    6.1七大核心属性

    1. corePoolSize(int):核心线程数量。默认情况下,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到任务队列当中。线程池将长期保证这些线程处于存活状态,即使线程已经处于闲置状态。除非配置了allowCoreThreadTimeOut=true,核心线程数的线程也将不再保证长期存活于线程池内,在空闲时间超过keepAliveTime后被销毁。
    2. workQueue:阻塞队列,存放等待执行的任务,线程从workQueue中取任务,若无任务将阻塞等待。当线程池中线程数量达到corePoolSize后,就会把新任务放到该队列当中。JDK提供了四个可直接使用的队列实现,分别是:基于数组的有界队列ArrayBlockingQueue、基于链表的无界队列LinkedBlockingQueue、只有一个元素的同步队列SynchronousQueue、优先级队列PriorityBlockingQueue。在实际使用时一定要设置队列长度
    3. maximumPoolSize(int):线程池内的最大线程数量,线程池内维护的线程不得超过该数量,大于核心线程数量小于最大线程数量的线程将在空闲时间超过keepAliveTime后被销毁。当阻塞队列存满后,将会创建新线程执行任务,线程的数量不会大于maximumPoolSize。
    4. keepAliveTime(long):线程存活时间,若线程数超过了corePoolSize,线程闲置时间超过了存活时间,该线程将被销毁。除非配置了allowCoreThreadTimeOut=true,核心线程数的线程也将不再保证长期存活于线程池内,在空闲时间超过keepAliveTime后被销毁。
    5. TimeUnit unit:线程存活时间的单位,例如TimeUnit.SECONDS表示秒。
    6. RejectedExecutionHandler:拒绝策略,当任务队列存满并且线程池个数达到maximunPoolSize后采取的策略。ThreadPoolExecutor中提供了四种拒绝策略,分别是:抛RejectedExecutionException异常的AbortPolicy(默认策略)、使用调用者所在线程来运行任务CallerRunsPolicy、丢弃一个等待执行的任务,然后尝试执行当前任务DiscardOldestPolicy、不动声色的丢弃并且不抛异常DiscardPolicy。项目中如果为了更多的用户体验,可以自定义拒绝策略。
    7. threadFactory:创建线程的工厂,虽说JDK提供了线程工厂的默认实现DefaultThreadFactory,但还是建议自定义实现最好,这样可以自定义线程创建的过程,例如线程分组、自定义线程名称等以进行线程监控

    6.2线程池处理流程图

    6.2.1 线程池处理

    在这里插入图片描述

    拒绝策略:RejectedExecutionHandler

    当任务队列和线程池都满了时所采取的应对策略,默认是AbordPolicy,表示无法处理新任务,并抛出RejectedExecutionException异常。此外还有3种策略:

    • CallerRunsPolicy:用调用者所在的线程处理任务。此策略提供简单的反馈机制,能够减缓新任务的提交速度。
    • DiscardPolicy:不能执行任务,并将任务删除。
    • DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。
    6.2.2 addWorker方法

    该方法返回false则会执行拒绝策略方法

    主流程图:

    在这里插入图片描述

    流程中去除一些异常情况,只留了主要流程,流程中有一步验证线程数大于核心线程或者最大线程数,如果传递的参数core等于true那么运行线程数量不能大于核心线程数量,如果为false则当前线程数量不能大于最大。

    addWorker只有两个作用:增加工作线程数量、创建一个Worker并加到工作线程集合中。

  • 相关阅读:
    数智随行 | 探想未来工厂数字化,强化智能设备管理
    k8s+springcloud+nacos部署配置
    计算机二级知识点整理
    CURL简单使用
    Linux下安装DM8
    数仓开发之DWD层(三)
    Spring Security常见过滤器
    Java学习总结(答案版)
    Java面试题初级准备
    数据库安全-Redis&Hadoop&Mysql&未授权访问&RCE
  • 原文地址:https://blog.csdn.net/xxx1276063856/article/details/134073550