• Java并发编程学习六:阻塞队列


    一、阻塞队列

    1. 简介

    阻塞队列,即BlockingQueue,它是一个接口,继承自Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。

    public interface BlockingQueue<E> extends Queue<E>{...}
    
    • 1

    阻塞队列是线程安全的,典型的应用场景是在生产者/消费者模式中,用于存储数据,保证再多线程下的正确运行。

    除了BlockingQueue,Queue接口的实现类和子类还有很多,如下图所示:

    上述实现类和子类中,除了Deque都是线程安全的,而这些线程安全的队列可以分为阻塞队列和非阻塞队列两大类。

    阻塞队列就是BlockingQueue 接口的实现类,主要有6种:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue。

    非阻塞队列就是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用 CAS 保证了线程安全。

    而Deque 是一个双端队列,从头和尾都能添加和删除元素;而普通的 Queue 只能从一端进入,另一端出去。

    2. BlockingQueue的常见方法

    BlockingQueue中和添加、删除相关的方法有8个,它们的区别仅在于特殊情况:当队列满了无法添加元素,或者是队列空了无法移除元素时,不同组的方法对于这种特殊情况会有不同的处理方式:

    • 抛出异常:add、remove、element
    • 返回结果但不抛出异常:offer、poll、peek
    • 阻塞:put、take

    a. add、remove、element方法

    这组方法在处理特殊情况时,会抛出异常

    add方法用于添加元素,如果队列满了,就会抛出异常来提示队列已满

    private static void addTest() {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        blockingQueue.add(1);
        blockingQueue.add(1);
        blockingQueue.add(1);
    }
    
    // 运行结果
    Exception in thread "main" java.lang.IllegalStateException:Queue full
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    remove 方法用于删除元素,如果队列为空,抛出异常

    private static void removeTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        blockingQueue.add(1);
        blockingQueue.add(1);
        blockingQueue.remove();
        blockingQueue.remove();
        blockingQueue.remove();
    }
    
    // 运行结果
    Exception in thread "main" java.util.NoSuchElementException
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    element 方法用于返回队列头结点,但并不删除。和remove 方法一样,如果队列为空,抛出异常

    private static void elementTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        blockingQueue.element();
    }
    
    // 运行结果
    Exception in thread "main" java.util.NoSuchElementException
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    b. offer、poll、peek方法

    这组方法在处理特殊情况时,会返回一个提示,而不会抛出异常。

    offer 方法用来插入一个元素,并用返回值来提示插入是否成功。如果添加成功会返回 true,而如果队列已经满了,返回false

    private static void offerTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        System.out.println(blockingQueue.offer(1));
        System.out.println(blockingQueue.offer(1));
        System.out.println(blockingQueue.offer(1));
    }
    
    // 运行结果
    true
    true
    false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    poll 方法用于移除并返回队列的头节点,如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null。正因为如此,不允许往队列中插入 null 值,否则没有办法区分返回的 null 是一个提示还是一个真正的元素

    private static void pollTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);
        blockingQueue.offer(1);
        blockingQueue.offer(2);
        blockingQueue.offer(3);
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }
    
    // 运行结果
    1
    2
    3
    null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    peek 方法用于返回队列的头元素但并不删除。如果队列里面是空的,会返回 null 作为提示。

    private static void peekTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        System.out.println(blockingQueue.peek());
    }
    
    // 运行结果
    null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    另外,offer 和 poll 都有带超时时间的重载方法。

    offer(E e, long timeout, TimeUnit unit)
    
    • 1

    以offer为例,它有三个参数,分别是元素、超时时长和时间单位。插入成功会返回 true;如果队列满了导致插入不成功,则会等待指定的超时时间,如果时间到了依然没有插入成功,就会返回 false。

    c. put、take方法
    这一组方法在处理特殊情况时,会采用阻塞等待的策略,这也是阻塞队列名字的由来

    put 方法用于插入元素。如果队列已满,既不会立刻返回 false 也不会抛出异常,而是让插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。

    take 方法用于获取并移除队列的头结点,当队列为空,则阻塞线程,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。

    3. 常见的阻塞队列

    a. ArrayBlockingQueue

    一种有界队列,底层基于数组实现,利用 ReentrantLock 实现线程安全。

    构造函数中可以指定队列容量,一旦指定后续不可以扩容。同时可以指定是否公平。

    ArrayBlockingQueue(int capacity, boolean fair)
    
    • 1

    b. LinkedBlockingQueue

    一种近似无界的队列(实际最大容量是整型的最大值 Integer.MAX_VALUE),内部基于链表实现。

    c. SynchronousQueue

    这种队列的容量为0,不能存储元素,每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。它的作用就是直接传递。

    由于SynchronousQueue的特性,它的一些方法返回值很独特

    // peek方法直接返回null
    public E peek() {
        return null;
    }
    
    // size方法直接返回0
    public int size() {
        return 0;
    }
    
    // isEmpty方法直接返回true
    public boolean isEmpty() {
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    d. PriorityBlockingQueue

    这种队列可以自定义内部元素的排列顺序,也是一个(可以看做)无界的阻塞队列。通过通过实现compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。

    PriorityBlockingQueue的take方法会阻塞,但是由于无界,put方法永远不会阻塞。

    e. DelayQueue

    这种队列具有“延迟”的功能,可以指定任务延迟多久之后执行。

    同时,它也是一个无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,因此可以比较和排序。

    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }
    
    • 1
    • 2
    • 3

    实现Delayed接口需要实现getDelay方法,该方法返回的是“还剩下多长的延迟时间才会被执行”。

    DelayQueue中的元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。其内部复用了PriorityBlockingQueue的逻辑进行排序。

    二、阻塞队列和非阻塞队列的线程安全原理

    无论是阻塞队列还是非阻塞队列,都是可以保证线程安全的。

    1. 阻塞队列的线程安全

    以ArrayBlockingQueue 的源码为例,以下是该类的重要属性:

    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    
    // 与线程安全有关
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Object 类型的数组用于存储元素;takeIndex 和 putIndex用来标明下一次读取和写入位置的;count 用来计数,它所记录的就是队列中的元素个数。而剩下的三个属性,一个是 ReentrantLock,另外两个Condition 分别是由 ReentrantLock 产生,这三个属性是实现线程安全最核心的工具。

    以put方法为例:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length){
            	notFull.await();
    		}
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    该方法内部逻辑是:

    • 首先用checkNotNull 方法去检查插入的元素是不是 null
    • 不为null时,用 ReentrantLock 上锁,并且上锁方法是 lock.lockInterruptibly()。这意味着在尝试获取锁但还没拿到锁的期间可以响应中断
    • 接着是try finally 代码块,finally 中会去解锁,try中的while 循环会会检查当前队列是不是已经满了。如果队列已满,便会进行等待,直到有空余的时候跳出循环,调用 enqueue 方法让元素进入队列,最后用 unlock 方法解锁。

    这就是ArrayBlockingQueue 中put方法的线程安全策略。其实在线程基础中用Condition 实现生产者消费者模式,本质上就是简易版的BlockingQueue。

    类似的,LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等也是利用了 ReentrantLock 来保证线程安全,只不过细节有差异,比如 LinkedBlockingQueue 的内部有两把锁,分别锁住队列的头和尾,比共用同一把锁的效率更高,不过总体思想都是类似的。

    2. 非阻塞队列的线程安全

    以ConcurrentLinkedQueue为例,查看offer方法的源码:

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
    
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t &amp;&amp; t != (t = tail)) ? t : q;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    该方法整体是一个大的for循环,而且是明显的死循环。代码中的 p.casNext 方法,正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
    
    • 1
    • 2
    • 3

    p.casNext 方法内部,运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而 compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断。

    三、阻塞队列的选择

    阻塞队列很重要的一个应用场景就是线程池的,常见的线程池有5种,每种线程池的阻塞队列选择不同,具体的情况在线程池中已经进行了较为细致的阐述。

    在其他的应用场景中,选择合适的阻塞队列可以从以下几点考虑:

    • 功能:如,是否需要阻塞队列排序,如优先级排序、延迟执行等
    • 容量:是否有存储的要求,还是只需要“直接传递”
    • 能否扩容:是否需要队列能够动态扩容
    • 内存结构:不同阻塞队列的底层时间不同,如果对性能有要求可以从内存的结构角度去考虑
    • 性能:比如 LinkedBlockingQueue 由于拥有两把锁,并发性能更好;SynchronousQueue只需要“直接传递”,而不需要存储,性能更好
  • 相关阅读:
    基于yolov8的半自动标注
    Dubbo链路追踪——生成全局ID(traceId)
    爬虫过程和反爬
    【mybatis基础】
    “一带一路”十周年:用英语讲好中华传统故事
    基于STM32的DHT11温湿度测量
    纠删码技术在vivo存储系统的演进【上篇】
    C++ 算法教程
    Java三大特征之一——继承
    mybatis单框架实现数据库新增、删除、修改
  • 原文地址:https://blog.csdn.net/weixin_41402069/article/details/126067479