• 阻塞队列ArrayBlockingQueue,LinkedBlockingQueue源码剖析


    1.阻塞队列用途以及特征

    1.1 继承结构

    首先我们来看一看阻塞队列家族
    在这里插入图片描述
    BlockingQueue接口定义了以下方法

    • boolean add(E e);
    • boolean offer(E e);
    • boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    • void put(E e) throws InterruptedException;
    • E poll(long timeout, TimeUnit unit) throws InterruptedException;
    • E take() throws InterruptedException;

    无非就是“入队”和“出队”两类动作,全部交由五个不同的实现类实现。

    1.2 用途
    • ArrayBlockingQueue 有界的任务队列
    • LinkedBlockingQeque 无界的任务队列,除非资源耗尽,不会存在任务入队失败的情况,有耗尽内存的风险(也可以指定大小,若不指定为Integer.MAX_VALUE)
    • SynchronousQueue
    • PriorityBlockingQueue
    • DelayedWorkQueue
    1.3 与普通队列的区别

    2.ArrayBlockingQueue源码剖析

    2.1 数据结构与类变量

    听名字我们就知道他的数据结构一定是数组,除此之外,数组上具有两个指针,分别指向头尾,头表示下一个该出队的元素,尾表示下一个该入队的元素位置。

    // 数据结构-数组
    final Object[] items;
    // 下一个该出队的元素 头指针
    int takeIndex;
    // 下一个该入队的元素位置 尾指针
    int putIndex;
    // 数组中存在的元素个数
    int count;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    下一步,我们考虑阻塞的实现,队列存在两种等待条件,一种是队列空时,出队需要等待,一种是队列满时,入队需要等待,synchronized只有一个等待条件,不符合要求,因此使用ReentrantLock。

    // 锁
    final ReentrantLock lock;
    // 队列空时的等待队列
    private final Condition notEmpty;
    // 队列满时的等待队列
    private final Condition notFull;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    2.2 构造方法

    数组构造方法一定需要指定容量,因为数组需要预分配空间

    	public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
    	}
    	public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            // 预分配空间
            this.items = new Object[capacity];
            // 初始化锁与条件
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    2.3 入队

    首先,我们实现在不使用锁的情况下进行入队方法,再在外层套一层壳即可。

    	private void enqueue(E x) {
            final Object[] items = this.items;
            // 直接给尾指针赋值
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            // 队列当前元素加一
            count++;
            // 通知等待队列不空的线程
            notEmpty.signal();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    下面,为确保线程安全,需要使用锁,而有三个操作都能入队,它们的实现也有细微区别,它们分别是:

    • boolean add(E e) 实际调用的还是offer,当offer返回false时,抛出异常
    • boolean offer(E e) 添加成功返回true,失败(队列满)返回false
    • void put(E e) throws InterruptedException 添加失败(队列满)阻塞

    先来看看offer的实现,

    	public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
            	// 队列满时,返回false
                if (count == items.length)
                    return false;
                // 入队
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在入队之前,先锁住队列实例,除开本线程外,其他线程既不能入队也不能出队。
    add就是在offer基础上套一层判断,

    	public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    而put的实现,无非就是在判断队列满时,当前线程进入notNull等待队列等待,lockInterruptibly是指当线程阻塞时,能响应中断,注意线程等待一般都需要使用while,让线程再度检查。

    	public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
            	// 队列满时,等待
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    2.4 出队

    同样,先看在不加锁时的出队方法,

    	private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            // 环形
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            // 通知等待队列不满的线程
            notFull.signal();
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    可以发现,由于双指针,入队和出队从逻辑上首位连到了一起,成了环形数组。
    与入队一样,有两个操作都能出队,

    • E poll() 成功返回出队的元素,失败(队列空)返回null
    • E take() 成功返回出队的元素,失败该线程阻塞

    先来看poll的实现,

    	public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
            	// 若队列空,则返回null
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在出队之前,先锁住队列实例,并且使用try catch来保证锁释放。
    take()的实现已经呼之欲出了,

    	public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在队列空时,进入notEmpty等待,等待队列不空这个条件。

    3.LinkedBlockingQueue源码剖析

    3.1 数据结构与类变量

    看到Linked,就知道它的数据结构一定是链表,链表需要定义节点,

    	static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    也就是节点的元素,节点的next指针。
    与数组实现类似,该链表需要头尾指针,链表的最大容量(默认为Integer.MAX_VALUE),和当前链表中存储的元素数量(使用并发安全的Integer,这是由于链表实现使用两把锁,不能保证并发安全)。

    transient Node<E> head;
    private transient Node<E> last;
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    
    • 1
    • 2
    • 3
    • 4

    阻塞的实现和ArrayBlockingQueue有些许不同,我们知道,数组形式的实现入队出队使用的是同一把锁,而链表实现入队和出队使用两把不同的锁,入队与出队不再互斥,效率更高。

    // 出队锁
    private final ReentrantLock takeLock = new ReentrantLock();
    // 入队锁
    private final ReentrantLock putLock = new ReentrantLock();
    // 两个条件变量
    private final Condition notEmpty = takeLock.newCondition();
    private final Condition notFull = putLock.newCondition();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    3.2 构造方法

    无参构造,调用有参构造,默认链表容量为Integer.MAX_VALUE

    	public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
    • 1
    • 2
    • 3

    有参构造,初始化容量,头尾指针指向null的头结点。

    	public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            // 带头结点的链表
            last = head = new Node<E>(null);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    3.3 入队

    首先是不使用锁的实现,尾插法,直接将node接到尾节点last后。

    	private void enqueue(Node<E> node) {
            last = last.next = node;
        }
    
    • 1
    • 2
    • 3

    然后加上锁,特别的,这里是入队锁(putLock),有两种入队方式

    • boolean add(E e) 实际调用的还是offer,当offer返回false时,抛出异常
    • boolean offer(E e) 入队成功返回true,失败返回false
    • void put(E e) 入队失败(队列满),阻塞

    首先看一下offer的实现,

    	public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            // 容量已满
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
    		// 入队锁
            putLock.lock();
            try {
            	// 再次检查容量(双重检查)
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                    	// 通知入队阻塞线程
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
            	// 如果队列中加入的元素是第一个,通知出队阻塞线程
                signalNotEmpty();
            return c >= 0;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    可以发现,链表结构的入队与数组结构的入队大不相同,首先,链表入队使用的是入队锁,其次,入队阻塞线程不完全由出队线程唤醒,而是可以由入队线程唤醒。
    如果队列中加入的元素是第一个,则需要去唤醒出队阻塞线程,

    	private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这是由于使用了两把锁,若入队阻塞线程都由出队线程唤醒,那么,出队线程在出队后,又要去获得入队锁唤醒线程,那么这两把锁就相当于还是一把,因为每次操作都要同时获得两把锁。

    put的实现也就在offer基础上,改变了队满的处理方法而已。

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    // 线程阻塞
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    3.4 出队

    同样先看不使用锁的实现,

    	private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            // 头结点断开
            h.next = h; // help GC
            head = first;
            // 取出第一个结点的元素
            E x = first.item;
            // 第一个结点成为新的头结点
            first.item = null;
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    出队的实现有些特殊,将头结点断开,取出第一个结点的元素,并让第一个结点成为头节点(置null)。
    现在我们加上出队锁,

    • E poll() 成功返回出队的元素,失败(队列空)返回null
    • E take() throws InterruptedException 成功返回出队的元素,失败该线程阻塞

    首先看poll()方法,

    	public E poll() {
            final AtomicInteger count = this.count;
            // 队列为空
            if (count.get() == 0)
                return null;
            // 返回的元素,默认为空
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            // 出队锁
            takeLock.lock();
            try {
            	// 双重检查,拿到锁后再次检查队列
                if (count.get() > 0) {
                	// 出队
                    x = dequeue();
                    c = count.getAndDecrement();
                    // 若队里还有元素,通知其他出队线程
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            // 从满队出队,通知入队线程
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    使用出队锁保证与入队动作无阻塞,同样,出队线程去通知另外的出队线程,当从满队出队时,去通知入队线程。

    看懂了poll,take的实现也就小菜一碟了,

    	public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
            	// 当队列为空时,等待队列不空
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    使用while循环,当再度被唤醒时,再次校验当前元素数量。

  • 相关阅读:
    ON CONFLICT语句
    【学习笔记】windows 下的 shared memory(共享内存)
    webapck打包原理--启动过程分析
    Scikit-learn:全面概述
    <四>虚函数 静态绑定 动态绑定
    mysql全量备份及数据恢复实践
    解决Android App 每启动一个Activity就看上去多启动一个应用/进程的问题
    关于小球放箱子的8种组合解法
    14.linux线程
    潮流计算专栏
  • 原文地址:https://blog.csdn.net/Yungang_Young/article/details/126395712