• 图解java.util.concurrent并发包源码系列——各种各样的阻塞队列BlockingQueue一套带走(ノ`Д)ノ!!!


    图解java.util.concurrent并发包源码系列——各种阻塞队列BlockingQueue一套带走

    BlockingQueue是java.util.concurrent并发包提供的一个可以在高并发场景下使用的阻塞队列的接口,它的实现类包括ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue等等。

    在这里插入图片描述

    BlockingQueue在各种使用Java编写的分布式或微服务中间件中使用得比较多。为了提升性能,这些中间件一般会把一些比较耗时的操作滞后,封装一个任务对象放入到BlockingQueue中,用一个后台线程去轮询这个BlockingQueue,而前方负责接收请求的接口把任务放入BlockingQueue后就响应客户端操作成功。

    在这里插入图片描述

    ArrayBlockingQueue

    成员变量:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
    	// 存放队列元素的数组
        final Object[] items;
    
    	// 下一个从队列中获取元素的数组下标
        int takeIndex;
    
    	// 下一个放入队列元素的是数组下标
        int putIndex;
    
    	// 队列元素个数
        int count;
    
    	// 重入锁,保证线程安全
        final ReentrantLock lock;
    
    	// 条件队列,当从队列中获取元素时取不到元素,线程会在notEmpty中等待
        private final Condition notEmpty;
    
    	// 条件队列,当往队列中放入元素时没有空位,线程会在notFull中等待
        private final Condition notFull;
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    ArrayBlockingQueue用一个数组保存放入到队列中的元素,然后有两个int类型的变量takeIndex、putIndex,分别记录下一次从数组中获取元素、放入元素的位置。

    ArrayBlockingQueue有一个ReentrantLock,当从队列中获取元素或放入元素时,要先获取到ReentrantLock锁。

    如果一个线程从ArrayBlockingQueue中获取元素时,发现ArrayBlockingQueue已经空了,那么这个线程会在notEmpty条件队列中等待。

    如果一个线程往ArrayBlockingQueue中放入元素时,发现ArrayBlockingQueue已经满了,那么这个线程会在notFull条件队列中等待。

    java.util.concurrent.ArrayBlockingQueue#put:

        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            // 加ReentrantLock锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
            	// 如果队列满了,在notFull中等待
                while (count == items.length)
                    notFull.await();
                // 插入元素
                enqueue(e);
            } finally {
            	// 释放锁
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    java.util.concurrent.ArrayBlockingQueue#enqueue:

        private void enqueue(E x) {
        	// 往数组下标putIndex的位置放入新元素
            final Object[] items = this.items;
            items[putIndex] = x;
            // 数组下标putIndex加1,如果越界了则回到0
            if (++putIndex == items.length)
                putIndex = 0;
            // 队列元素个数加1
            count++;
            // 唤醒notEmpty中等待的线程
            notEmpty.signal();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    java.util.concurrent.ArrayBlockingQueue#take:

        public E take() throws InterruptedException {
        	// 加ReentrantLock锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
            	// 如果队列空了,在notEmpty中等待
                while (count == 0)
                    notEmpty.await();
                // 从队列取元素
                return dequeue();
            } finally {
            	// 释放锁
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    java.util.concurrent.ArrayBlockingQueue#dequeue

        private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            // 获取下标为takeIndex的元素
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            // takeIndex下标加1,如果越界了则回到0
            if (++takeIndex == items.length)
                takeIndex = 0;
            // 队列中元素个数减1
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            // 唤醒在notFull中等待的线程
            notFull.signal();
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    LinkedBlockingQueue

    成员变量:

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
    	// LinkedBlocking的节点内部类
        static class Node<E> {
    		
    		// 节点中的元素
            E item;
    
    		// 下一个节点(单向链表)
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
    	// 队列容量
        private final int capacity;
    
    	// 当前队列大小
        private final AtomicInteger count = new AtomicInteger();
    
    	// 链表头指针
        transient Node<E> head;
    
    	// 链表尾指针
        private transient Node<E> last;
    
    	// 从队列中获取元素时要加的锁
        private final ReentrantLock takeLock = new ReentrantLock();
    
    	// 从队列中获取元素时队列空了,会在notEmpty条件队列中等待
        private final Condition notEmpty = takeLock.newCondition();
    
    	// 往队列中放入元素时要加的锁
        private final ReentrantLock putLock = new ReentrantLock();
    
    	// 往队列中放入元素时队列满了,会在notFull条件队列中等待
        private final Condition notFull = putLock.newCondition();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    LinkedBlockingQueue使用单向链表保存放入到队列中的元素,元素会被包装成一个Node节点,放入到链表中。

    Node是定义在LinkedBlockingQueue中的一个内部类,有一个next指针指向下一个节点。

    LinkedBlockingQueue有一个头指针head和一个尾指针last,获取元素时元素从头部出队并修改head指针,放入元素时元素从尾部入队并修改last指针。

    LinkedBlockingQueue有两个ReentrantLock,分表是takeLock和putLock,takeLock在从队列中获取元素时使用,putLock在往队列中放入元素时使用。

    LinkedBlockingQueue也有两个Condition,notEmpty和notFull。当一个线程从队列中获取元素时,队列是空的,那么线程要在notEmpty条件队列中等待。当一个线程往队列中放入元素时,队列是满的,那么线程要在notFull条件队列中等待。

    在这里插入图片描述

    java.util.concurrent.LinkedBlockingQueue#put:

        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            // 加putLock锁
            putLock.lockInterruptibly();
            try {
            	// 如果队列满了,在notFull中等待
                while (count.get() == capacity) {
                    notFull.await();
                }
                // 元素入队列
                enqueue(node);
                // count加1然后返回count原来的值
                c = count.getAndIncrement();
                // 队列还未满,唤醒notFull中等待的线程
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
            	// 释放锁
                putLock.unlock();
            }
            // 如果原来的队列是空的,唤醒在notEmpty中等待的线程
            if (c == 0)
                signalNotEmpty();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    java.util.concurrent.LinkedBlockingQueue#enqueue:

        private void enqueue(Node<E> node) {
            // node放入队列尾部,更新last指针
            last = last.next = node;
        }
    
    • 1
    • 2
    • 3
    • 4

    java.util.concurrent.LinkedBlockingQueue#signalNotEmpty:

        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            // 加takeLock锁
            takeLock.lock();
            try {
            	// 唤醒notEmpty中等待的线程
                notEmpty.signal();
            } finally {
            	// 释放锁
                takeLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    java.util.concurrent.LinkedBlockingQueue#take:

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            // 获取takeLock锁
            takeLock.lockInterruptibly();
            try {
            	// 如果队列空了,在notEmpty中等待
                while (count.get() == 0) {
                    notEmpty.await();
                }
                // 从队列取元素
                x = dequeue();
                // count减1,然后返回原来的count
                c = count.getAndDecrement();
                // 队列还没空,唤醒在notEmpty中等待的线程
                if (c > 1)
                    notEmpty.signal();
            } finally {
            	// 释放锁
                takeLock.unlock();
            }
            // 如果原来队列是满的,唤醒在notFull等待的线程
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    java.util.concurrent.LinkedBlockingQueue#dequeue:

        private E dequeue() {
    		// 取出头节点
            Node<E> h = head;
            // 头节点的后继节点
            Node<E> first = h.next;
            h.next = h;
            // 更新头指针
            head = first;
            // 返回节点中的元素item
            E x = first.item;
            first.item = null;
            return x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    java.util.concurrent.LinkedBlockingQueue#signalNotFull:

        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            // 获取putLock
            putLock.lock();
            try {
            	// 唤醒在notFull中等待的线程
                notFull.signal();
            } finally {
            	// 释放锁
                putLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    PriorityBlockingQueue

    成员变量:

    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    	// 堆数组(小顶堆)
    	private transient Object[] queue;
    
    	// 队列大小
        private transient int size;
    
    	// 比较器
        private transient Comparator<? super E> comparator;
    
    	// 从队列中存取元素,都要加ReentrantLock锁
        private final ReentrantLock lock;
    
    	// 从队列中取元素,队列空了,线程要在notEmpty条件队列中等待
        private final Condition notEmpty;
    
    	// 自旋锁,数组扩容时使用
        private transient volatile int allocationSpinLock;
    
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    PriorityBlockingQueue是优先级阻塞队列,在java.util包中还有一个PriorityQueue优先级队列,PriorityBlockingQueue是它的升级版。当一个元素放入优先级队列中,元素会自动根据预定义的优先级比较策略进行排序,排在队列头部的元素是优先级最高的,每次从队列中获取元素时都会从头部中获取。PriorityBlockingQueue相当于是并发场景下线程安全的PriorityQueue。

    PriorityBlockingQueue用一个数组queue保存放入到队列中的元素,数组queue是一个堆数组,也就是用一个数组去替代二叉堆,如果一个节点是 queue[n],它的左子节点和右子节点就分别是 queue[2n+1] 和 queue[2(n+1)]。

    PriorityBlockingQueue每次存取元素后,都要做堆调整,以满足预定义的优先级比较策略下元素的排序。

    PriorityBlockingQueue可以接收一个比较器Comparator,我们可以通过自定义比较器的比较逻辑来自定义PriorityBlockingQueue中元素的排序策略。如果不传递比较器给PriorityBlockingQueue,PriorityBlockingQueue就使用Comparable接口的compareTo(T o)方法进行排序,此时如果放入PriorityBlockingQueue的元素没有实现Comparable接口,就会抛出ClassCastException异常。

    从PriorityBlockingQueue中存取元素,都要加ReentrantLock锁。

    如果一个线程要从PriorityBlockingQueue中获取元素,而此时正好PriorityBlockingQueue中没有元素,那么该线程要在notEmpty条件队列中等待。

    PriorityBlockingQueue要进行queue数组的扩容时,会加自旋锁,通过CAS修改allocationSpinLock变量为1(原来是0),加锁成功才能进行数组扩容操作。

    在这里插入图片描述
    java.util.concurrent.PriorityBlockingQueue#put:

        public void put(E e) {
            offer(e);
        }
    
    • 1
    • 2
    • 3

    java.util.concurrent.PriorityBlockingQueue#offer(E):

        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            // 加ReentrantLock锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            // 如果队列满了,要进行扩容
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
    			// 比较器是空,使用Comparable接口进行元素比较,做堆的向上调整
                if (cmp == null)
                    siftUpComparable(n, e, array);
                // 比较器不为空,使用比较器进行元素比较,做堆的向上调整
                else
                    siftUpUsingComparator(n, e, array, cmp);
                // 队列元素个数加1
                size = n + 1;
                // 唤醒在notEmpty中等待的线程
                notEmpty.signal();
            } finally {
            	// 释放锁
                lock.unlock();
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    java.util.concurrent.PriorityBlockingQueue#tryGrow:

        private void tryGrow(Object[] array, int oldCap) {
        	// 先释放锁,等扩容完了重新获取
            lock.unlock();
            Object[] newArray = null;
    		// CAS获取自旋锁
            if (allocationSpinLock == 0 &&
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                         0, 1)) {
                try {
                	// 扩容,如果老数组长度小于64,新数组长度加2,否则扩容1.5倍
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) :
                                           (oldCap >> 1));
                    // 如果新数组长度超过最大限制,那么修改扩容后长度为老数组长度加1,如果还是超了,那么抛异常
                    if (newCap - MAX_ARRAY_SIZE > 0) {
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                            throw new OutOfMemoryError();
                        newCap = MAX_ARRAY_SIZE;
                    }
                    // 创建新数组
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];
                } finally {
                	// 释放自旋锁
                    allocationSpinLock = 0;
                }
            }
    		
    		// 扩容完,还没完成数组元素拷贝,其他线程要让出时间片
            if (newArray == null)
                Thread.yield();
                
            // 重新获取锁
            lock.lock();
            if (newArray != null && queue == array) {
            	// 新数组赋值到queue
                queue = newArray;
                // 完成数组元素拷贝
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    java.util.concurrent.PriorityBlockingQueue#siftUpComparable:

        private static <T> void siftUpComparable(int k, T x, Object[] array) {
        	// 类型强制转换,转成Comparable
            Comparable<? super T> key = (Comparable<? super T>) x;
            // 向上调整,直到来到顶部,或者找到合适的位置
            while (k > 0) {
            	// 当前位置k的父元素位置
                int parent = (k - 1) >>> 1;
                // 获取父元素
                Object e = array[parent];
                // 利用Comparable接口的compareTo方法进行比较
                if (key.compareTo((T) e) >= 0)
                	// 因为是小顶堆,如果key.compareTo((T) e)返回大于等于0,那么当前位置k就是元素x要放入的位置
                    break;
                // 把父元素往下拽
                array[k] = e;
                // 更新下标k为父元素的下标,用于下一轮比较
                k = parent;
            }
            // 把当前元素放入位置k
            array[k] = key;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    java.util.concurrent.PriorityBlockingQueue#siftUpUsingComparator:

        private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                           Comparator<? super T> cmp) {
            // 向上调整,直到来到顶部,或者找到合适的位置                     
            while (k > 0) {
            	// 当前位置k的父元素位置
                int parent = (k - 1) >>> 1;
                // 获取父元素
                Object e = array[parent];
                // 利用比较器进行比较
                if (cmp.compare(x, (T) e) >= 0)
                	// 因为是小顶堆,如果cmp.compare(x, (T) e)返回大于等于0,那么当前位置k就是元素x要放入的位置
                    break;
                // 把父元素往下拽
                array[k] = e;
                // 更新下标k为父元素的下标,用于下一轮比较
                k = parent;
            }
            // 把当前元素放入位置k
            array[k] = x;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    java.util.concurrent.PriorityBlockingQueue#take:

        public E take() throws InterruptedException {
        	// 加ReentrantLock锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
            	// 从队列中获取元素,没有元素就在notEmpty中等待
                while ( (result = dequeue()) == null)
                    notEmpty.await();
            } finally {
            	// 释放锁
                lock.unlock();
            }
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
        private E dequeue() {
            int n = size - 1;
            // 队列没有元素,返回null
            if (n < 0)
                return null;
            else {
                Object[] array = queue;
                // 取出堆顶元素,作为返回结果
                E result = (E) array[0];
                // 取出堆底元素
                E x = (E) array[n];
                // 堆底置为null
                array[n] = null;
                Comparator<? super E> cmp = comparator;
                // 如果比较器为空,用Comparable进行元素比较,做堆的向下调整
                if (cmp == null)
                    siftDownComparable(0, x, array, n);
    			// 如果比较器不为空,用比较器进行元素比较,做堆的向下调整
                else
                    siftDownUsingComparator(0, x, array, n, cmp);
                // 更新队列大小
                size = n;
               	// 返回结果
                return result;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    java.util.concurrent.PriorityBlockingQueue#siftDownComparable:

        private static <T> void siftDownComparable(int k, T x, Object[] array,
                                                   int n) {
            if (n > 0) {
            	// 类型强转,转成Comparable
                Comparable<? super T> key = (Comparable<? super T>)x;
                // 计算堆底位置
                int half = n >>> 1;
                // 循环进行堆的向下调整,直到来到堆底
                while (k < half) {
                	// 左孩子位置
                    int child = (k << 1) + 1;
                    // 获取左孩子
                    Object c = array[child];
                    // 右孩子位置
                    int right = child + 1;
                    // 两个孩子先pk,保存胜出的孩子和胜出孩子的位置
                    if (right < n &&
                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                        c = array[child = right];
                    // 当前元素跟胜出的孩子比,如果比胜出的孩子还小(更优先),那么位置k就是目标位置
                    if (key.compareTo((T) c) <= 0)
                        break;
                    // 把胜出孩子往上提
                    array[k] = c;
                    // 更新位置k,用于下一轮比较
                    k = child;
                }
                // 当前元素放入目标位置
                array[k] = key;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    java.util.concurrent.PriorityBlockingQueue#siftDownUsingComparator:

        private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                        int n,
                                                        Comparator<? super T> cmp) {
            if (n > 0) {
            	// 计算堆底位置
                int half = n >>> 1;
                // 循环进行堆的向下调整,直到来到堆底
                while (k < half) {
                	// 左孩子位置
                    int child = (k << 1) + 1;
                    // 获取左孩子
                    Object c = array[child];
                    // 右孩子位置
                    int right = child + 1;
                    // 两个孩子先pk,保存胜出的孩子和胜出孩子的位置
                    if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                        c = array[child = right];
                    // 当前元素跟胜出的孩子比,如果比胜出的孩子还小(更优先),那么位置k就是目标位置
                    if (cmp.compare(x, (T) c) <= 0)
                        break;
                    // 把胜出孩子往上提
                    array[k] = c;
                    k = child;
                }
                // 当前元素放入目标位置
                array[k] = x;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在这里插入图片描述

    DelayQueue

    成员变量:

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
     	// 每一个从队列中存取元素的线程,都要先获取lock锁
        private final transient ReentrantLock lock = new ReentrantLock();
        // 优先级队列,根据元素到期时间从小到大排序
        private final PriorityQueue<E> q = new PriorityQueue<E>();
        // leader线程,等待PriorityQueue中堆顶元素到期的线程
        private Thread leader = null;
        // 条件队列,队列为空或者堆顶元素未到期,都会在这里等待
        private final Condition available = lock.newCondition();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    DelayQueue是延时队列,每个放入DelayQueue中的元素都有一个到期时间,只有元素到期了才能从DelayQueue中取走,否则线程要在条件队列available中进行等待。

    DelayQueue有一个ReentrantLock锁,每个从队列中存取元素的线程,都要先获取ReentrantLock锁。

    DelayQueue使用一个PriorityQueue保存放入到队列中的元素,PriorityQueue根据元素的到期时间从小到大进行排序,堆顶元素是最快到期的元素。

    DelayQueue中有一个leader线程,保存等待堆顶元素到期取走的线程。当leader线程成功获取到堆顶元素会,会唤醒等待在available中的线程。

    在这里插入图片描述

    java.util.concurrent.DelayQueue#put:

        public void put(E e) {
            offer(e);
        }
    
    • 1
    • 2
    • 3

    java.util.concurrent.DelayQueue#offer(E):

        public boolean offer(E e) {
        	// 获取ReentrantLock锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
            	// 元素放入PriorityQueue
                q.offer(e);
                // PriorityQueue中堆顶元素更新为了当前元素,leader线程置空,唤醒available中等待的线程
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
            	// 释放锁
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    java.util.concurrent.DelayQueue#take:

        public E take() throws InterruptedException {
        	// 获取ReentrantLock锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                	// 堆顶元素
                    E first = q.peek();
                    // 队列为空,线程在available中等待
                    if (first == null)
                        available.await();
                    else {
                    	// 堆顶元素的到期时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 堆顶元素到期,取走
                        if (delay <= 0)
                            return q.poll();
                        first = null;
                        // 堆顶元素未到期,又有leader线程在等着,当前线程在available中等待
                        if (leader != null)
                            available.await();
                        else {
                        	// leader线程为空,更新当前线程为leader线程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                            	// 在available等待到堆顶元素到期
                                available.awaitNanos(delay);
                            } finally {
                            	// leader线程是当前线程,置空leader
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
            	// leader线程为null,堆顶又有元素,唤醒在available中等待的线程
                if (leader == null && q.peek() != null)
                    available.signal();
                // 释放锁
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    在这里插入图片描述

    SynchronousQueue

    SynchronousQueue比较特别,它是一个不存储元素的队列。每个往SynchronousQueue放入元素的线程,都要阻塞等待直到有线程从SynchronousQueue中取走该元素。每个从SynchronousQueue中获取元素的线程都要阻塞等待,直到有线程往SynchronousQueue中放入元素。

    在这里插入图片描述

  • 相关阅读:
    SpringBoot自定义banner,如何定制炫酷的banner提升项目B格?
    vue3 | HighCharts实战自定义封装之径向条形图
    设计师都有些常用的组件库?
    Vue3+Vite+ElementPlus管理系统常见问题
    【Unity好插件之PlayMaker系列一上半部分】如何只用一个插件和一个脚本完成制作一个简易的游戏
    16 Linux之JavaEE定制篇-搭建JavaEE环境
    HiveQL
    PyQt5 QTableWidget
    解决QT信号在信号和槽连接前发出而导致槽函数未调用问题
    pytest并发执行用例方案
  • 原文地址:https://blog.csdn.net/weixin_43889578/article/details/132259400