DelayQueue 延迟队列使用和延时功能源码分析,先看DelayQueue 的使用
目录
想要实现延时功能,需要实现 Delayed 接口,重写 getDelay 方法,在 getDelay 方法里返回延时时间
笔者定义一个 Order 类
在构造函数中传入延时的时间
- package com.wsjzzcbq.java.queue;
-
- import java.util.concurrent.Delayed;
- import java.util.concurrent.TimeUnit;
-
- /**
- * Order
- *
- * @author wsjz
- * @date 2023/09/22
- */
- public class Order implements Delayed {
-
- /**
- * 延时时长
- */
- private long time;
- /**
- * 延时开始时间
- */
- private long start = System.currentTimeMillis();
-
- public Order(long time) {
- this.time = time;
- }
-
- public Order(long time, long start) {
- this.time = time;
- this.start = start;
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert((start + time) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public int compareTo(Delayed o) {
- Order order = (Order)o;
- return (int) (this.getDelay(TimeUnit.MILLISECONDS) - order.getDelay(TimeUnit.MILLISECONDS));
- }
- }
延时队列使用
(1)、以现在时间为开始时间,延时获取
- package com.wsjzzcbq.java.queue;
-
- import java.time.LocalDateTime;
- import java.util.concurrent.DelayQueue;
-
- /**
- * DelayQueueLearn
- *
- * @author wsjz
- * @date 2023/09/22
- */
- public class DelayQueueLearn {
-
- public static void main(String[] args) throws InterruptedException {
- DelayQueue
delayQueue = new DelayQueue<>(); - Order order = new Order(5*1000);
-
- System.out.println(LocalDateTime.now());
- delayQueue.add(order);
- Order order1 = delayQueue.take();
- System.out.println(LocalDateTime.now());
- System.out.println(order1);
- }
- }
延时 5 秒钟才能获取
测试运行

添加到队列后5秒钟,获取数据
(2)、以指定时间为开始时间,延时获取
以当前时间加 5 秒为开始时间,延时 5 秒钟获取
- package com.wsjzzcbq.java.queue;
-
- import java.time.LocalDateTime;
- import java.time.ZoneOffset;
- import java.util.concurrent.DelayQueue;
-
- /**
- * DelayQueueLearn
- *
- * @author wsjz
- * @date 2023/09/22
- */
- public class DelayQueueLearn {
-
- public static void main(String[] args) throws InterruptedException {
- DelayQueue
delayQueue = new DelayQueue<>(); -
- //当前时间加5秒为开始时间
- LocalDateTime localDateTime = LocalDateTime.now().plusSeconds(5);
- long start = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- System.out.println(start);
- Order order = new Order(5*1000, start);
-
- System.out.println(LocalDateTime.now());
- delayQueue.add(order);
- Order order1 = delayQueue.take();
- System.out.println(LocalDateTime.now());
- System.out.println(order1);
- }
- }
测试运行

一共延时 10 秒钟
DelayQueue 是基于 PriorityQueue(优先队列)实现的,PriorityQueue 默认是最小堆结构

我们先看 add 添加方法
- /**
- * Inserts the specified element into this delay queue.
- *
- * @param e the element to add
- * @return {@code true} (as specified by {@link Collection#add})
- * @throws NullPointerException if the specified element is null
- */
- public boolean add(E e) {
- return offer(e);
- }
offer 方法
- /**
- * Inserts the specified element into this delay queue.
- *
- * @param e the element to add
- * @return {@code true}
- * @throws NullPointerException if the specified element is null
- */
- public boolean offer(E e) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- q.offer(e);
- if (q.peek() == e) {
- leader = null;
- available.signal();
- }
- return true;
- } finally {
- lock.unlock();
- }
- }
先获取锁,然后调用 PriorityQueue 的 offer 方法,如果此时 PriorityQueue 的头部元素是新添加的元素,则 leader = null,并唤醒等待线程;否则直接返回 true
因为这里的 PriorityQueue 是最小堆结构,所以它能保证延时时间最小的元素最先出队(添加进去的元素 Order 对象实现了 compareTo 方法)
PriorityQueue 的 offer 方法
- /**
- * Inserts the specified element into this priority queue.
- *
- * @return {@code true} (as specified by {@link Queue#offer})
- * @throws ClassCastException if the specified element cannot be
- * compared with elements currently in this priority queue
- * according to the priority queue's ordering
- * @throws NullPointerException if the specified element is null
- */
- public boolean offer(E e) {
- if (e == null)
- throw new NullPointerException();
- modCount++;
- int i = size;
- if (i >= queue.length)
- grow(i + 1);
- size = i + 1;
- if (i == 0)
- queue[0] = e;
- else
- siftUp(i, e);
- return true;
- }
如果超出容量的话,调用 grow 方法扩容
如果是首次添加的话放在数组索引是0的首位
如果队列中有元素的话,调用 siftUp 方法添加
grow 方法
PriorityQueue 基于数组实现
- /**
- * Increases the capacity of the array.
- *
- * @param minCapacity the desired minimum capacity
- */
- private void grow(int minCapacity) {
- int oldCapacity = queue.length;
- // Double size if small; else grow by 50%
- int newCapacity = oldCapacity + ((oldCapacity < 64) ?
- (oldCapacity + 2) :
- (oldCapacity >> 1));
- // overflow-conscious code
- if (newCapacity - MAX_ARRAY_SIZE > 0)
- newCapacity = hugeCapacity(minCapacity);
- queue = Arrays.copyOf(queue, newCapacity);
- }
siftUp 方法
- /**
- * Inserts item x at position k, maintaining heap invariant by
- * promoting x up the tree until it is greater than or equal to
- * its parent, or is the root.
- *
- * To simplify and speed up coercions and comparisons. the
- * Comparable and Comparator versions are separated into different
- * methods that are otherwise identical. (Similarly for siftDown.)
- *
- * @param k the position to fill
- * @param x the item to insert
- */
- private void siftUp(int k, E x) {
- if (comparator != null)
- siftUpUsingComparator(k, x);
- else
- siftUpComparable(k, x);
- }
默认 comparator 是 null,调用 siftUpComparable 方法
- private void siftUpComparable(int k, E x) {
- Comparable super E> key = (Comparable super E>) x;
- while (k > 0) {
- int parent = (k - 1) >>> 1;
- Object e = queue[parent];
- if (key.compareTo((E) e) >= 0)
- break;
- queue[k] = e;
- k = parent;
- }
- queue[k] = key;
- }
siftUpComparable 方法会进行比较,保证延时时间最小的元素在最上面
然后我们直接看 take 方法
- /**
- * Retrieves and removes the head of this queue, waiting if necessary
- * until an element with an expired delay is available on this queue.
- *
- * @return the head of this queue
- * @throws InterruptedException {@inheritDoc}
- */
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- //队列头部元素
- E first = q.peek();
- if (first == null)
- //如果头部元素是null 则让当前线程等待
- available.await();
- else {
- //头部元素不为空,获取延时时间
- long delay = first.getDelay(NANOSECONDS);
- if (delay <= 0)
- //延时时间小于等于0,出队返回
- return q.poll();
- first = null; // don't retain ref while waiting
- if (leader != null)
- available.await();
- else {
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- //让线程等待延时时间
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && q.peek() != null)
- available.signal();
- lock.unlock();
- }
- }
相关说明在代码注释中,先看队列头部元素是不是null,如果是说明当前队列为空,让线程等待;如果不为空,看头部元素延时时间,如果延时时间小于等于0,则出队返回,leader 默认是null,因此线程等待延时时间的时长,等待时间到达后,重新开始循环,此时延时时间小于等于0,出队返回,达到延时效果
关于leader 的分析,leader 这里使用了 Leader-Follower 模式的变体
- /**
- * Thread designated to wait for the element at the head of
- * the queue. This variant of the Leader-Follower pattern
- * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
- * minimize unnecessary timed waiting. When a thread becomes
- * the leader, it waits only for the next delay to elapse, but
- * other threads await indefinitely. The leader thread must
- * signal some other thread before returning from take() or
- * poll(...), unless some other thread becomes leader in the
- * interim. Whenever the head of the queue is replaced with
- * an element with an earlier expiration time, the leader
- * field is invalidated by being reset to null, and some
- * waiting thread, but not necessarily the current leader, is
- * signalled. So waiting threads must be prepared to acquire
- * and lose leadership while waiting.
- */
- private Thread leader = null;
假设没有 leader,现在有2个线程,线程A 和线程B,线程A 和线程B 都会执行 available.awaitNanos(delay) 进行等待,等待时间结束后,线程A 和线程B中只有一个能拿到元素返回,另外一个将重新等待,对于没拿到元素的线程来说一开始等待,之后等待结束被唤醒,最后再次等待,是一种资源浪费,不如一开始就让它一直等待(如果它不是leader的话)
leader 更详细的分析:https://stackoverflow.com/questions/48493830/what-exactly-is-the-leader-used-for-in-delayqueue
DelayQueue 内部基于优先队列 PriorityQueue(最小堆结构)实现延时时间小的元素总是先出队。延时功能是通过循环加线程等待的方式实现的,先判断 PriorityQueue 中延时时间最小的元素的延时时间是否小于等于0,如果是则直接出队返回;否则让线程等待延时的时长,等待结束后,开始新一轮循环,这时延时时间肯定是小于等于0的,出队返回,达到延时的效果
至此完