阻塞队列的存取性质与普通队列一致,即先进先出
同时,阻塞队列提供具有下面特性的存取方法
阻塞队列可以在上述条件下自动阻塞(挂起)线程,
随后,在满足条件时自动唤醒线程
编写逻辑时,不用考虑线程相关的问题,而只需要专注于容器
原始多线程实现方式见 基础 | 并发编程 - [Lock 使用 & 对比 synchronized] 中示例
类型一览
类型 | 是否有界 | 数据结构 | 特性 | 备注 |
---|---|---|---|---|
ArrayBlockingQueue | √ | 数组 | ||
LinkedBlockingQueue | √ | 链表 | 虽然有界,但默认情况下大小为 Integer.MAX_VALUE,约等于无界 | |
PriorityBlockingQueue | × | 支持优先级 | 取操作或获取优先级最高或最低的元素,而不完全按顺序 | |
DelayBlockingQueue | × | 支持优先级,延迟 | ||
SynchronousQueue | √ | 不存储元素,一个存或取操作到达此队列就被阻塞,直到另一个与之配对的取或存操作到来,一起放行 | ||
LinkedTransferQueue | × | 链表 | 约等于 SynchronousQueue + LinkedBlockingQueue,无界,所以 put() 并不阻塞,但可以通过 transfer() 阻塞的使消费者消费 | |
LinkedBlockingDeque | √ | 链表 | 双向 |
特性说明
常用 api 一览
作为容器,阻塞队列的核心功能也是 存 和 取
但阻塞队列为存取提供了多个接口,并配合多种反馈效果
element() 和 peek() 会返回队列的第一个元素
但队列为空(empty)时,前者抛异常,后者返回 null
存 | 取 | 查 | |
---|---|---|---|
阻塞 | put(e) | take() | |
超时 | offer(e,time,unit) | poll(e,time,unit) | |
标记值 | offer(e) | poll() | peek() |
异常 | add(e) | remove() | element() |
通用 api 示例
仅以 ArrayBlockingQueue 为例
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
try {
// 阻塞卡死 ============================================
queue.put("a");
System.out.println("put 1 over"); // put 1 over
queue.put("b"); // block here
System.out.println("put 2 over"); //not output
queue.take();
System.out.println("take 1 over"); // take 1 over
queue.take(); // block here
System.out.println("take 2 over"); //not output
// 阻塞后恢复 ============================================
queue.put("c");
System.out.println("put 3 over"); // put 3 over
new Thread(()->{
// block here, until thread named "BBB" is over
try { queue.put("C"); } catch (InterruptedException e) { e.printStackTrace(); }
},"AAA").start();
new Thread(()->{
// take the "c" out
try { TimeUnit.SECONDS.sleep(1); queue.take(); } catch (InterruptedException e) { e.printStackTrace(); }
},"BBB").start();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 先阻塞,超时后返回标记值 ============================================
try {
queue.offer("a",1L,TimeUnit.SECONDS); // true
queue.offer("b",1L,TimeUnit.SECONDS); // 1 second later, false
queue.poll(1L,TimeUnit.SECONDS); // a
queue.poll(1L,TimeUnit.SECONDS); // 1 second later, null
} catch (InterruptedException e) {
e.printStackTrace();
}
// 标记值 ============================================
queue.offer("a"); // true
queue.offer("b"); // false
queue.peek(); // first element
queue.poll(); // a
queue.poll(); // null
queue.peek(); // null
// 一言不合抛异常 ============================================
queue.add("a"); // true
queue.add("b"); // java.lang.IllegalStateException: Queue full
queue.element(); // first element
queue.remove(); // element
queue.remove(); // java.util.NoSuchElementException
queue.element(); // java.util.NoSuchElementException
}
SynchronousQueue 不存储元素,存入一个元素后,此元素不取出,不能存入另一个元素
Synchronous 的意思是指此队列会使一对存取操作同步,如果不能达成同步则会阻塞,直到等到另一个匹配的操作
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
new Thread(()->{
System.out.println("1 <--");
try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("2 <--");
try { queue.put(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("3 <--");
try { queue.put(3); } catch (InterruptedException e) { e.printStackTrace(); }
}).start();
new Thread(()->{
try { TimeUnit.SECONDS.sleep(2);queue.take(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("1 ---->");
try { TimeUnit.SECONDS.sleep(2);queue.take(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("2 ---->");
try { TimeUnit.SECONDS.sleep(2);queue.take(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("3 ---->");
}).start();
}