顾名思义:
当写入时:如果队列满了,则必须阻塞等待
当读取时,如果队列为空,则必须阻塞等待
List、Set、BlockingQueue同级,都继承于Collection,且BlockingQueue下级有ArrayBlockingQueue以及LinkBlockingQueue,即BlockingQueue有两种表达形式,数组型以及链表型
什么情况下我们会使用阻塞队列:多线程并发处理,线程池
添加、移除
方式 | 抛出异常 | 不抛出异常,有返回值 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | take | offer(“a”,2,TimeUnit.SECONDS) |
移除 | remove | poll | put | poll(2,TimeUnit.SECONDS); |
判断队列首部 | element | peek | - | - |
1、抛出异常
package bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test01 {
public static void main(String[] args) {
test1();
}
/**
* 抛出异常
*/
public static void test1(){
// capacity:队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// IllegalStateException 队列已满,抛出异常
// System.out.println(blockingQueue.add("d"));
// 查看队首元素
System.out.println(blockingQueue.element());
System.out.println("==================================");
// FIFO,弹出队头
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// NoSuchElementException 队列为空,抛出异常
//System.out.println(blockingQueue.remove());
}
}
2、不会抛出异常
package bq;
import java.sql.Array;
import java.util.concurrent.ArrayBlockingQueue;
public class Test01 {
public static void main(String[] args) {
test2();
}
/**
* 有返回值
* 不抛出异常
*/
public static void test2(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// 队满,不抛出异常,返回false
System.out.println(blockingQueue.offer("d"));
// 取出队首元素
System.out.println(blockingQueue.peek());
System.out.println("=================================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 队空,不抛出异常,返回null
System.out.println(blockingQueue.poll());
}
}
3、阻塞等待
package bq;
import java.sql.Array;
import java.util.concurrent.ArrayBlockingQueue;
public class Test01 {
public static void main(String[] args) throws InterruptedException {
test3();
}
/**
* 等待,阻塞(一直阻塞)
*
*/
public static void test3() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// 一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); // 队列没有位置了,一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// 队列为空,会一直等待,等待元素进来然后被取出
// System.out.println(blockingQueue.take());
}
}
4、超时等待
package bq;
import java.sql.Array;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test01 {
public static void main(String[] args) throws InterruptedException {
test4();
}
/**
* 等待,阻塞(等待超时)
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// 队满,等待两秒,若两秒内出现空位,则入队,否则直接放弃入队
blockingQueue.offer("d",2, TimeUnit.SECONDS);
System.out.println("=======================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 队空,等待两秒,若两秒内出现元素,则出队,否则直接放弃
blockingQueue.poll(2,TimeUnit.SECONDS);
}
}
没有容量,必须等待取出来之后,才能再往里面放一个元素
下面来自文心一言
SynchronousQueue是Java中的一种特殊队列,它是一个线程安全的队列,不存储任何元素,每次插入操作必须等待其他线程的删除操作,反之亦然。
它的内部只能够容纳单个元素,如果队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
SynchronousQueue常用于实现生产者-消费者模型,生产者和消费者在同一池中协调工作。在这种情况下,SynchronousQueue可以避免生产者和消费者之间的竞争条件和死锁问题。
很多小伙伴在写狂神说的代码时(包括我自己)会发现,有时候put了2次才take1次,实际上并不是synchronousQueue出现了问题。 问题是因为put语句在打印语句之后,在并发下,有可能先打印了“T1 put 2”,但实际上并没有put操作,等到take了之后,才put,也就是说,实际上运行顺序有可能是这样的:
打印put -----> take语句执行 -----> put语句执行
因此,只要将put语句和打印put的语句顺序调换即可,如下所示:
package bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步队列
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
// SynchronousQueue 继承于BlockingQueue
BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();
new Thread(()->{
try {
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("3");
System.out.println(Thread.currentThread().getName()+" put 3");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"T2").start();
}
}