目录
阻塞队列是一种特殊的队列,也遵守"先进先出"的原则。这里注意不要和操作系统内核里,表示阻塞状态的 PCB 的那个链表混为一谈了!!
阻塞队列能够保证线程安全,它有以下特性:
🍃1.当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素。
🍃2.当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素。
什么是生产者消费者模型呢??
🍁 【举例】
按照上图中的方案二包饺子,会出现两种情况:
🍃1.生产者擀皮速度惊人,还没等消费者使用,盖帘就摆满了,所以它就停下来休息一会。
🍃2.生产者生产的速度跟不上消费者消耗擀皮的速度,于是就出现,盖帘里没有擀皮的情况,于是消费者就等待休息一会。
我们把这种模型就叫做生产者消费者模型!!
生产者消费者模型的优点
🍁优点一:可以做到更好的"解耦合"
🍁优点二:能够做到"削峰填谷",提高整个系统的抗风险能力!!
所以我们就需要利用生产者消费者模型来解决这种问题!!
- public class TestDemo1 {
- public static void main(String[] args) {
- BlockingQueue
blockingQueue = new LinkedBlockingQueue<>(); -
- Thread customer = new Thread(() -> {
- while(true) {
- try {
- int value = blockingQueue.take();
- System.out.println("消费元素: " + value);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- customer.start();
- Thread producer = new Thread(() -> {
- int n = 0;
- while(true) {
- try {
- System.out.println("生产元素: " + n);
- blockingQueue.put(n);
- n++;
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- producer.start();
- }
- }
🍃基于数组的方式来实现
🍃两个核心方法:put(入队列)和 take(出队列)
🍁【实现代码】
- public class MyBlockingQueue {
- //假定数组最大容量为 1000
- private int[] elem = new int[1000];
- //队头
- private int front = 0;
- //队尾
- private int rear = 0;
- //阻塞队列中有效数据个数
- volatile private int size = 0;
-
- //入队列
- public void put(int value) throws InterruptedException {
- synchronized (this) {
- while(size == elem.length) {
- //队列满了,入队阻塞等待
- this.wait();
- }
- elem[rear] = value;
- rear++;
- if(rear == elem.length) {
- //如果 rear 到达数组末尾,就让它重新从 0 开始
- rear = 0;
- }
- size++;
- //插入元素后,就可以把 出队列 唤醒了
- this.notify();
- }
- }
- //出队列
- public Integer take() throws InterruptedException {
- int ret = 0;
- synchronized (this) {
- while(size == 0) {
- //队列为空,出队阻塞等待
- this.wait();
- }
- ret = this.elem[front];
- front++;
- if(front == elem.length) {
- front = 0;
- }
- size--;
- //取走元素后,就可以把 入队列 唤醒了
- this.notify();
- }
- return ret;
- }
- }
🍁【基本步骤】
🍃1.通过 "循环队列" 的方式来实现;
🍃2.使用 synchronized 进行加锁控制;
🍃3.put 插入元素的时候, 判定如果队列满了, 就进行 wait;
🍃4.take 取出元素的时候, 判定如果队列为空, 就进行 wait。
🍁【重点分析】
🍃1.出队,入队,以及元素个数 ++,-- 等等都涉及到线程安全问题,判空、判满,里面又用到了 wait(),而 wait() 方法的第一步就是释放锁,所以 synchronized 要将它们全部包裹住。
🍃2.理解队列为空时,出队需等待,插入元素后,即可唤醒出队;队列为满时,插入队列需等待,取出元素后,即可唤醒入队
🍃3.入队和出队操作涉及到了读和写,给 size 变量加上 volatile 防止编译器优化,指令重排序影响 size 的值。
🍃4.为什么判空判满时需要加上循环,结合下图理解:
🍁【测试代码】
- public class Test {
- public static void main(String[] args) {
- MyBlockingQueue queue = new MyBlockingQueue();
- Thread customer = new Thread(() -> {
- while(true) {
- try {
- //消费元素
- int value = queue.take();
- System.out.println("消费: " + value);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- customer.start();
- Thread producer = new Thread(() -> {
- int value = 0;
- while(true) {
- try {
- //生产元素
- queue.put(value);
- System.out.println("生产: " + value);
- value++;
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- producer.start();
- }
- }
🍁【分享掌握技巧】
1.理解如何使用;
2.理解内部的实现原理;
3.能够模拟实现;
4.能给别人清楚的讲出来。(写博客就是在给别人讲)!!
本篇博客就到这里了,谢谢观看!!