阻塞队列(BlockingQueue)
阻塞队列是多线程代码中比较常用的一种数据结构。是一种特殊的队列,带有阻塞特性。
为何说是一种特殊的队列?
1.线程安全
2.带有阻塞特性
- 如果队列为空,继续出队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。
- 如果队列为满,继续入队列,就会发生阻塞。阻塞到其他线程从队列中取走元素为止。
在java中标准库中针对zuseduilie提供两种实现方式:
基于数组:
BlockingQueuequeue = new ArrayBlockingQueue<>(5); 基于链表:
BlockingQueuequeue = new LinkedBlockingDeque<>(); 方法:
put() 入队列 带有阻塞性质
take() 出队列 带有阻塞性质
实现基于数组(循环队列)的阻塞队列:
- //实现阻塞队列
- class myBlockingQueue{
-
- //锁对象
- private Object object = new Object();
-
- //队列采用循环队列 数组
- private String[] data = new String[1000];
-
- //队头元素位置 加volatile防止内存可见性问题
- private volatile int head = 0;
- //队尾元素位置
- private volatile int tail = 0;
- //有效长度
- private volatile int size = 0;
-
- //带有阻塞性质的入队操作put
- public void put(String str) throws InterruptedException {
- synchronized(object) {
- //队列满时
- if(size==data.length) {
- //阻塞等待 等待另一个线程调用notify方法唤醒
- object.wait();
- }
- //队列不满 入队列
- data[tail] = str;
- tail++;
- size++;
- object.notify();
-
- //由于数组循环使用 也防止索引出界
- if(tail==data.length) {
- tail = 0;
- }
- }
-
- }
-
- //带有阻塞性质的出队列操作
- public String take() throws InterruptedException {
- synchronized(object) {
- //队列为空
- if(size==0) {
- //阻塞等待
- object.wait();
- }
- //队列不为空
- String tmp = data[head];
- head++;
- if(head==data.length) {
- head = 0;
- }
- size--;
- //唤醒
- object.notify();
- return tmp;
- }
-
- }
-
- }
代码实现中的一些细节:
实现生产者消费者模型
生产者 - 消费者模型( Producer-consumer problem) 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。

这个模型由两类线程和一个缓冲区组成来组成
运行流程
实现"生产者消费者模型"好处:
(1)解耦合
两个模块之间联系越紧密,耦合就越高。尤其对于分布式系统来说,十分有意义。
(2)削峰填谷
峰:指短时间内请求多。
比如服务器和客户端之间的请求与响应,当用户量请求增大时,服务器也会受牵连,甚至于 将服务器弄崩溃给挂了,耦合性较高,如果两者之间用一种数据结构如队列存储请求,就不论客户 端用户量请求有多大时,服务器仍然可以按照自己的速度去处理请求。
消息队列:当把阻塞队列封装成单独的服务器程序,部署到特定的机器上,这个时候就把这个队列称为"消息队列"。
实现生产者消费者模型代码:
- //实现阻塞队列
- class myBlockingQueue{
-
- //锁对象
- private Object object = new Object();
-
- //队列采用循环队列 数组
- private String[] data = new String[1000];
-
- //头指针 加volatile防止内存可见性问题
- private volatile int head = 0;
- //尾指针
- private volatile int tail = 0;
- //有效长度
- private volatile int size = 0;
-
- //带有阻塞性质的入队操作put
- public void put(String str) throws InterruptedException {
- synchronized(object) {
- //队列满时
- while (size==data.length) {
- //阻塞等待 等待另一个线程调用notify方法唤醒
- object.wait();
- }
- //队列不满 入队列
- data[tail] = str;
- tail++;
- size++;
- object.notify();
-
- //由于数组循环使用 也防止索引出界
- if(tail==data.length) {
- tail = 0;
- }
- }
-
- }
-
- //带有阻塞性质的出队列操作
- public String take() throws InterruptedException {
- synchronized(object) {
- //队列为空
- while (size==0) {
- //阻塞等待
- object.wait();
- }
- //队列不为空
- String tmp = data[head];
- head++;
- if(head==data.length) {
- head = 0;
- }
- size--;
- //唤醒
- object.notify();
- return tmp;
- }
-
- }
- }
-
-
-
- //借助阻塞队列 实现生产者消费者模型
- public class test {
- public static void main(String[] args) {
- MyBlockingQueue queue = new MyBlockingQueue();
-
- //生产者模型
- Thread t1 = new Thread(()->{
- int num = 1;
- while(true) {
- try {
- queue.put(num);
- System.out.println("生产者生产"+num);
- num++;
- //Thread.sleep(1000); //生产者有节奏生产
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
-
- //消费者模型
- Thread t2 =new Thread(()->{
- while(true) {
- try {
- int tmp = queue.take();
- System.out.println("消费者消费"+tmp);
- Thread.sleep(1000); //消费者有节奏消费
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- t1.start();
- t2.start();
- }
- }