• 【JAVA】并发包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别?


    前言

    今天介绍一下线程安全队列。Java 标准库提供了非常多的线程安全队列,很容易混淆。

    本篇博文的重点是,并发包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别?  

    概述

    有时候我们把并发包下面的所有容器都习惯叫作并发容器,但是严格来讲,类似 ConcurrentLinkedQueue 这种 “Concurrent..” 容器,才是真正代表并发。

    关于问题中它们的区别:

    • Concurrent 类型基于 lock-free,在常见的多线程访问场景,一般可以提供较高吞吐量。
    • 而 LinkedBlockingQueue 内部则是基于锁,并提供了 BlockingQueue 的等待性方法。

    不知道你有没有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent*、CopyOnWrite 和 Blocking 等三类,同样是线程安全容器,可以简单认为:

    • Concurrent 类型没有类似 CopyOnWrite 之类容器相对较重的修改开销。
    • 但是,凡事都是有代价的,Concurrent 往往提供了较低的遍历一致性。你可以这样理解所谓的弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。
    • 与弱一致性对应的,是同步容器常见的行为 “fail-fast”,也就是检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException,不再继续遍历。
    • 弱一致性的另外一个体现是,size 等操作准确性是有限的,未必是 100% 准确。
    • 与此同时,读取的性能具有一定的不确定性。  

    正文

    线程安全队列

    下面这张图是 Java 并发类库提供的各种各样的线程安全队列实现,注意,图中并未将非线程安全部分包含进来。

    我们可以从不同的角度进行分类,从基本的数据结构的角度分析,有两个特别的 Deque 实现,ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的侧重点是支持对队列头尾都进行插入和删除,所以提供了特定的方法,如:

    • 尾部插入时需要的 addLast(e)、offerLast(e)。
    • 尾部删除所需要的 removeLast()、pollLast()。

    从上面这些角度,能够理解 ConcurrentLinkedDeque 和 LinkedBlockingQueue 的主要功能区别,也就足够日常开发的需要了。但是如果我们深入一些,通常会更加关注下面这些方面。

    从行为特征来看,绝大部分 Queue 都是实现了 BlockingQueue 接口。在常规队列操作基础上,Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。

    1. /**
    2. * 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
    3. */
    4. E take() throws InterruptedException;
    5. /**
    6. * 插入元素,如果队列已满,则等待直到队列出现空闲空间
    7. */
    8. void put(E e) throws InterruptedException;
    9. 复制代码

    另一个 BlockingQueue 经常被考察的点,就是是否有界(Bounded、Unbounded),这一点也往往会影响我们在应用开发中的选择,这里简单总结一下。

    • ArrayBlockingQueue 是最典型的的有界队列,其内部以 final 的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时,都要指定容量,如
      1. public ArrayBlockingQueue(int capacity, boolean fair)
      2. 复制代码
    • LinkedBlockingQueue,容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。
    • SynchronousQueue,这是一个非常奇葩的队列实现,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是 1 吗?其实不是的,其内部容量是 0。
    • PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。
    • DelayedQueueLinkedTransferQueue 同样是无边界的队列。对于无边界的队列,有一个自然的结果,就是 put 操作永远也不会发生其他 BlockingQueue 的那种等待情况。

    如果我们分析不同队列的底层实现,BlockingQueue 基本都是基于锁实现,一起来看看典型的 LinkedBlockingQueue。

    1. /** Lock held by take, poll, etc */
    2. private final ReentrantLock takeLock = new ReentrantLock();
    3. /** Wait queue for waiting takes */
    4. private final Condition notEmpty = takeLock.newCondition();
    5. /** Lock held by put, offer, etc */
    6. private final ReentrantLock putLock = new ReentrantLock();
    7. /** Wait queue for waiting puts */
    8. private final Condition notFull = putLock.newCondition();
    9. 复制代码

    在介绍 ReentrantLock 的条件变量用法的时候分析过 ArrayBlockingQueue,不知道你有没有注意到,其条件变量与 LinkedBlockingQueue 版本的实现是有区别的。notEmpty、notFull 都是同一个再入锁的条件变量,而 LinkedBlockingQueue 则改进了锁操作的粒度,头、尾操作使用不同的锁,所以在通用场景下,它的吞吐量相对要更好一些。

    下面的 take 方法与 ArrayBlockingQueue 中的实现,也是有不同的,由于其内部结构是链表,需要自己维护元素数量值,请参考下面的代码。

    1. public E take() throws InterruptedException {
    2. final E x;
    3. final int c;
    4. final AtomicInteger count = this.count;
    5. final ReentrantLock takeLock = this.takeLock;
    6. takeLock.lockInterruptibly();
    7. try {
    8. while (count.get() == 0) {
    9. notEmpty.await();
    10. }
    11. x = dequeue();
    12. c = count.getAndDecrement();
    13. if (c > 1)
    14. notEmpty.signal();
    15. } finally {
    16. takeLock.unlock();
    17. }
    18. if (c == capacity)
    19. signalNotFull();
    20. return x;
    21. }
    22. 复制代码

    类似 ConcurrentLinkedQueue 等,则是基于 CAS 的无锁技术,不需要在每个操作时使用锁,所以扩展性表现要更加优异。

    相对比较另类的 SynchronousQueue,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。它是 Executors.newCachedThreadPool() 的默认队列。  

    队列使用场景与典型用例

    在实际开发中,Queue 被广泛使用在生产者 - 消费者场景,比如利用 BlockingQueue 来实现,由于其提供的等待机制,我们可以少操心很多协调工作,参考下面样例代码:

    1. import java.util.concurrent.ArrayBlockingQueue;
    2. import java.util.concurrent.BlockingQueue;
    3. public class ConsumerProducer {
    4. public static final String EXIT_MSG = "Good bye!";
    5. public static void main(String[] args) {
    6. // 使用较小的队列,以更好地在输出中展示其影响
    7. BlockingQueue queue = new ArrayBlockingQueue<>(3);
    8. Producer producer = new Producer(queue);
    9. Consumer consumer = new Consumer(queue);
    10. new Thread(producer).start();
    11. new Thread(consumer).start();
    12. }
    13. static class Producer implements Runnable {
    14. private BlockingQueue queue;
    15. public Producer(BlockingQueue q) {
    16. this.queue = q;
    17. }
    18. @Override
    19. public void run() {
    20. for (int i = 0; i < 20; i++) {
    21. try{
    22. Thread.sleep(5L);
    23. String msg = "Message" + i;
    24. System.out.println("Produced new item: " + msg);
    25. queue.put(msg);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. }
    30. try {
    31. System.out.println("Time to say good bye!");
    32. queue.put(EXIT_MSG);
    33. } catch (InterruptedException e) {
    34. e.printStackTrace();
    35. }
    36. }
    37. }
    38. static class Consumer implements Runnable{
    39. private BlockingQueue queue;
    40. public Consumer(BlockingQueue q){
    41. this.queue=q;
    42. }
    43. @Override
    44. public void run() {
    45. try{
    46. String msg;
    47. while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
    48. System.out.println("Consumed item: " + msg);
    49. Thread.sleep(10L);
    50. }
    51. System.out.println("Got exit message, bye!");
    52. }catch(InterruptedException e) {
    53. e.printStackTrace();
    54. }
    55. }
    56. }
    57. }
    58. 复制代码

    上面是一个典型的生产者 - 消费者样例,如果使用非 Blocking 的队列,那么我们就要自己去实现轮询、条件判断(如检查 poll 返回值是否 null)等逻辑,如果没有特别的场景要求,Blocking 实现起来代码更加简单、直观。

    前面介绍了各种队列实现,在日常的应用开发中,如何进行选择呢?

    以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 为例,我们一起来分析一下,根据需求可以从很多方面考量:

    • 考虑应用场景中对队列边界的要求。ArrayBlockingQueue 是有明确的容量限制的,而 LinkedBlockingQueue 则取决于我们是否在创建时指定,SynchronousQueue 则干脆不能缓存任何元素。
    • 从空间利用角度,数组结构的 ArrayBlockingQueue 要比 LinkedBlockingQueue 紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存需求更大。
    • 通用场景中,LinkedBlockingQueue 的吞吐量一般优于 ArrayBlockingQueue,因为它实现了更加细粒度的锁操作。
    • ArrayBlockingQueue 实现比较简单,性能更好预测,属于表现稳定的 “选手”。
    • 如果我们需要实现的是两个线程之间接力性(handoff)的场景,你可能会选择 CountDownLatch,但是SynchronousQueue 也是完美符合这种场景的,而且线程间协调和数据传输统一起来,代码更加规范。
    • 可能令人意外的是,很多时候 SynchronousQueue 的性能表现,往往大大超过其他实现,尤其是在队列元素较小的场景。  

    后记

    以上就是 【JAVA】并发包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别? 的所有内容了;

    分析了 Java 中让人眼花缭乱的各种线程安全队列,试图从几个角度,让每个队列的特点更加明确,进而希望减少你在日常工作中使用时的困扰。

     

  • 相关阅读:
    【Vue+ElementUI】Table表格实现自定义表头展示+表头拖拽排序(附源码)
    Everything 一款功能强大的搜索工具
    JVM实战:JVM运行时数据区
    客户案例 | 宝藏平台,企业再也不用担心运营管理那些事儿
    系统编程07-线程的互斥锁、读写锁、条件变量
    高分三号1米分辨率飞机检测识别数据集
    多元函数泰勒公式(含黑塞矩阵)
    关于联网麻将游戏的修改和完善
    Spring中拦截器重复注册的问题排查
    MySql ocp认证之备份与恢复(四)
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/127441957