今天介绍一下线程安全队列。Java 标准库提供了非常多的线程安全队列,很容易混淆。
本篇博文的重点是,并发包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别?
有时候我们把并发包下面的所有容器都习惯叫作并发容器,但是严格来讲,类似 ConcurrentLinkedQueue 这种 “Concurrent..” 容器,才是真正代表并发。
关于问题中它们的区别:
不知道你有没有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent*、CopyOnWrite 和 Blocking 等三类,同样是线程安全容器,可以简单认为:
下面这张图是 Java 并发类库提供的各种各样的线程安全队列实现,注意,图中并未将非线程安全部分包含进来。

我们可以从不同的角度进行分类,从基本的数据结构的角度分析,有两个特别的 Deque 实现,ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的侧重点是支持对队列头尾都进行插入和删除,所以提供了特定的方法,如:
从上面这些角度,能够理解 ConcurrentLinkedDeque 和 LinkedBlockingQueue 的主要功能区别,也就足够日常开发的需要了。但是如果我们深入一些,通常会更加关注下面这些方面。
从行为特征来看,绝大部分 Queue 都是实现了 BlockingQueue 接口。在常规队列操作基础上,Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。
- /**
- * 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
- …
- */
- E take() throws InterruptedException;
-
- /**
- * 插入元素,如果队列已满,则等待直到队列出现空闲空间
- …
- */
- void put(E e) throws InterruptedException;
- 复制代码
另一个 BlockingQueue 经常被考察的点,就是是否有界(Bounded、Unbounded),这一点也往往会影响我们在应用开发中的选择,这里简单总结一下。
ArrayBlockingQueue 是最典型的的有界队列,其内部以 final 的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时,都要指定容量,如 - public ArrayBlockingQueue(int capacity, boolean fair)
- 复制代码
LinkedBlockingQueue,容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。SynchronousQueue,这是一个非常奇葩的队列实现,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是 1 吗?其实不是的,其内部容量是 0。PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。DelayedQueue 和 LinkedTransferQueue 同样是无边界的队列。对于无边界的队列,有一个自然的结果,就是 put 操作永远也不会发生其他 BlockingQueue 的那种等待情况。如果我们分析不同队列的底层实现,BlockingQueue 基本都是基于锁实现,一起来看看典型的 LinkedBlockingQueue。
- /** Lock held by take, poll, etc */
- private final ReentrantLock takeLock = new ReentrantLock();
-
- /** Wait queue for waiting takes */
- private final Condition notEmpty = takeLock.newCondition();
-
- /** Lock held by put, offer, etc */
- private final ReentrantLock putLock = new ReentrantLock();
-
- /** Wait queue for waiting puts */
- private final Condition notFull = putLock.newCondition();
- 复制代码
在介绍 ReentrantLock 的条件变量用法的时候分析过 ArrayBlockingQueue,不知道你有没有注意到,其条件变量与 LinkedBlockingQueue 版本的实现是有区别的。notEmpty、notFull 都是同一个再入锁的条件变量,而 LinkedBlockingQueue 则改进了锁操作的粒度,头、尾操作使用不同的锁,所以在通用场景下,它的吞吐量相对要更好一些。
下面的 take 方法与 ArrayBlockingQueue 中的实现,也是有不同的,由于其内部结构是链表,需要自己维护元素数量值,请参考下面的代码。
- public E take() throws InterruptedException {
- final E x;
- final int c;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- while (count.get() == 0) {
- notEmpty.await();
- }
- x = dequeue();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
- 复制代码
类似 ConcurrentLinkedQueue 等,则是基于 CAS 的无锁技术,不需要在每个操作时使用锁,所以扩展性表现要更加优异。
相对比较另类的 SynchronousQueue,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。它是 Executors.newCachedThreadPool() 的默认队列。
在实际开发中,Queue 被广泛使用在生产者 - 消费者场景,比如利用 BlockingQueue 来实现,由于其提供的等待机制,我们可以少操心很多协调工作,参考下面样例代码:
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
-
- public class ConsumerProducer {
- public static final String EXIT_MSG = "Good bye!";
-
- public static void main(String[] args) {
- // 使用较小的队列,以更好地在输出中展示其影响
- BlockingQueue
queue = new ArrayBlockingQueue<>(3); - Producer producer = new Producer(queue);
- Consumer consumer = new Consumer(queue);
- new Thread(producer).start();
- new Thread(consumer).start();
- }
-
-
- static class Producer implements Runnable {
- private BlockingQueue
queue; - public Producer(BlockingQueue
q) { - this.queue = q;
- }
-
- @Override
- public void run() {
- for (int i = 0; i < 20; i++) {
- try{
- Thread.sleep(5L);
- String msg = "Message" + i;
- System.out.println("Produced new item: " + msg);
- queue.put(msg);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- try {
- System.out.println("Time to say good bye!");
- queue.put(EXIT_MSG);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- static class Consumer implements Runnable{
- private BlockingQueue
queue; - public Consumer(BlockingQueue
q) { - this.queue=q;
- }
-
- @Override
- public void run() {
- try{
- String msg;
- while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
- System.out.println("Consumed item: " + msg);
- Thread.sleep(10L);
- }
- System.out.println("Got exit message, bye!");
- }catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- 复制代码
上面是一个典型的生产者 - 消费者样例,如果使用非 Blocking 的队列,那么我们就要自己去实现轮询、条件判断(如检查 poll 返回值是否 null)等逻辑,如果没有特别的场景要求,Blocking 实现起来代码更加简单、直观。
前面介绍了各种队列实现,在日常的应用开发中,如何进行选择呢?
以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 为例,我们一起来分析一下,根据需求可以从很多方面考量:
以上就是 【JAVA】并发包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别? 的所有内容了;
分析了 Java 中让人眼花缭乱的各种线程安全队列,试图从几个角度,让每个队列的特点更加明确,进而希望减少你在日常工作中使用时的困扰。