• Java EE——阻塞队列


    队列

    我们之前在数据结构讲过,队列是一种先进先出的存储数据的结构。然而事实上并不是所有的队列都是先进先出的,比如优先级队列,消息队列

    消息队列就是可以指定出队列的数据的类型,例如我们在医院排队并不是所有人按先后顺序看病的,而是要看指定的科室的医生有没有把上一个病人看完。等到看完了以后,再指定从队列中取出属于自己这个科室的队列中的第一个人

    中间件

    像消息队列这种我们在工作中各处都有可能用到的结构,我们就会把他单独实现成一个程序,然后部署到服务器上,这样的话需要用的的时候就可以直接使用,这样的程序称之为“中间件”
    比如我们之前讲的MySQL就是一个中间件

    阻塞队列

    阻塞队列是一种先进先出的数据结构,和普通队列不同的是,当队列是空的,我们这时候取队列元素的时候,就会阻塞等待,直到队列不为空。同样的,当队列是满的时候,我们入队列,也会阻塞等待直到队列有位置了

    并且我们的阻塞队列是线程安全的

    功能

    生产者消费者模型

    指的是多线程协同工作的方式。
    例如我们一个线程是生产辣条,一个线程是吃辣条
    当我们的生产的辣条到一定数量的时候,生产者就会停止生产,防止过剩。
    当我们的消费者把辣条都吃完的时候,消费者就会停止吃,等待生产者生产。

    解耦合

    阻塞队列可以让代码解耦合

    例如我们的两个服务器之间需要通信,a服务器如果直接给b服务器发送消息,两个服务器中就都有关于对方的代码

    如果我们在a服务器和b服务器之间加上阻塞队列,这样的话,我们的a服务器和b服务器的耦合度就降低了。

    耦合度低了就可以防止我们如果加入了另一个服务器c,我们就要大幅度调整a服务器的代码,如果有阻塞队列的话,就可以直接让c从阻塞队列中读取数据了

    削峰填谷

    例如我们的两个服务器之间需要通信,当a服务器给b服务器发送的请求过多时,b服务器就有可能瘫痪
    如果我们在a服务器和b服务器之间加上阻塞队列,当a服务器传入的消息到达一定数量时,就不能再传入了。而当阻塞队列是空的,b服务器就不读取消息了

    服务器瘫痪的原因:
    服务器在处理请求时会消耗很多资源,例如内存,cpu,硬盘,带宽
    如果请求过多(例如双十一),那么资源消耗的越多,资源消耗没了,服务器就瘫痪了

    java标准库下的阻塞队列

    我们java标准库的阻塞队列的类是BlockingQueue
    其有多种实现,例如数组实现,链表实现,优先级队列实现…

    demo

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockingQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue blockingQueue = new ArrayBlockingQueue(1000);
            blockingQueue.put(1);
            blockingQueue.put(2);
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    取出元素是take方法
    存入元素是put方法
    当我们取出元素次数比存入还多,那么这个代码就会一直卡在这个take上

    MyBlockingQueue

    我们可以自己实现一个基于数组实现的阻塞队列

    public class MyBlockingQueue {
        private int [] elem = new int[1000];
        private volatile int head = 0;
        private volatile int tail = 0;
        private volatile int size = 0;
    
        public void put(int value) throws InterruptedException {
            synchronized (this){
                while (size == elem.length){
                    this.wait();
                }
                elem[tail] = value;
                tail++;
                if(tail >= size){
                    tail = 0;
                }
                size++;
                this.notify();
            }
        }
    
        public int take() throws InterruptedException {
            synchronized (this){
                while (size == 0){
                    this.wait();
                }
                int ret = elem[head];
                head++;
                if(head >= size){
                    head = 0;
                }
                size--;
                this.notify();
                return ret;
            }
        }
    
        public static void main(String[] args) {
            MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
            Thread producer = new Thread(() -> {
                int n = 1;
                while(true){
                    try {
                        myBlockingQueue.put(1);
                        n++;
                        System.out.println("生产元素" + n);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            Thread customer = new Thread(() -> {
                int n = 1;
                while(true){
                    try {
                        myBlockingQueue.take();
                        n++;
                        System.out.println("消费元素" + n);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            producer.start();
            customer.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    volatile写法

    我们的volatile将队列中的元素进行修饰,防止了内存可见性问题和指令重排序问题

    wait和notify写法

    我们的take和put中都各有一个wait和一个notify
    事实上,他们的wait都是让自己阻塞等待,而notify是使对方唤醒
    也就是说,当a线程调用put方法,如果队列满了,那么就会触发wait,而只有在b线程调用了take方法,执行到notify时,才会把a线程唤醒,继续执行下面的操作

    同样的,当a线程调用take方法,如果队列满了,那么就会触发wait,而只有在b线程调用了put方法,执行到notify时,才会把a线程唤醒,继续执行下面的操作

    为什么是while

    而之所以我们的wait条件判定是while而不是if,这是因为我们希望线程在唤醒时再判定一下队列是否真的是不满了/不空了

    因为我们的wait不一定是被对方的notify唤醒的,也有可能是interrupt唤醒的

    还有一种可能,当a线程调用take方法,发现队列空的,于是释放锁,阻塞等待,然后b线程调用put方法,给队列中放了一个数据,然后释放锁。这时还有一个线程c,也想调用take方法,由于我们的锁释放后并不是先来后到,因此可能线程c抢到了这把锁,取走了队列中的元素,这时线程a才从wait中唤醒,重新获得锁,但是又没有元素可以取走了

  • 相关阅读:
    树上启发式合并小结
    分析各大常用的JS加密优缺点
    手握Python这柄宝剑,我一举拿下4个offer
    Apache Bench(ab )压力测试
    使用Kohya_ss训练Stable Diffusion Lora
    关于Linux Shell 脚本的制作
    Tugraph图学习技术详解
    华为路由器升级系统文件
    Transformers基本组件(二)快速入门Datasets、Evaluate、Trainer
    如何实现制造业信息化转型?
  • 原文地址:https://blog.csdn.net/m0_60867520/article/details/126916588