• DelayQueue 使用和延时功能源码分析


    DelayQueue 延迟队列使用和延时功能源码分析,先看DelayQueue 的使用

    目录

    1、基本使用

    2、延时功能源码分析

    3、总结


    1、基本使用

    想要实现延时功能,需要实现 Delayed 接口,重写 getDelay 方法,在 getDelay 方法里返回延时时间

    笔者定义一个 Order 类

    构造函数中传入延时的时间

    1. package com.wsjzzcbq.java.queue;
    2. import java.util.concurrent.Delayed;
    3. import java.util.concurrent.TimeUnit;
    4. /**
    5. * Order
    6. *
    7. * @author wsjz
    8. * @date 2023/09/22
    9. */
    10. public class Order implements Delayed {
    11. /**
    12. * 延时时长
    13. */
    14. private long time;
    15. /**
    16. * 延时开始时间
    17. */
    18. private long start = System.currentTimeMillis();
    19. public Order(long time) {
    20. this.time = time;
    21. }
    22. public Order(long time, long start) {
    23. this.time = time;
    24. this.start = start;
    25. }
    26. @Override
    27. public long getDelay(TimeUnit unit) {
    28. return unit.convert((start + time) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    29. }
    30. @Override
    31. public int compareTo(Delayed o) {
    32. Order order = (Order)o;
    33. return (int) (this.getDelay(TimeUnit.MILLISECONDS) - order.getDelay(TimeUnit.MILLISECONDS));
    34. }
    35. }

    延时队列使用

    (1)、以现在时间为开始时间,延时获取

    1. package com.wsjzzcbq.java.queue;
    2. import java.time.LocalDateTime;
    3. import java.util.concurrent.DelayQueue;
    4. /**
    5. * DelayQueueLearn
    6. *
    7. * @author wsjz
    8. * @date 2023/09/22
    9. */
    10. public class DelayQueueLearn {
    11. public static void main(String[] args) throws InterruptedException {
    12. DelayQueue delayQueue = new DelayQueue<>();
    13. Order order = new Order(5*1000);
    14. System.out.println(LocalDateTime.now());
    15. delayQueue.add(order);
    16. Order order1 = delayQueue.take();
    17. System.out.println(LocalDateTime.now());
    18. System.out.println(order1);
    19. }
    20. }

    延时 5 秒钟才能获取

    测试运行

    添加到队列后5秒钟,获取数据

    (2)、以指定时间为开始时间,延时获取

    以当前时间加 5 秒为开始时间,延时 5 秒钟获取

    1. package com.wsjzzcbq.java.queue;
    2. import java.time.LocalDateTime;
    3. import java.time.ZoneOffset;
    4. import java.util.concurrent.DelayQueue;
    5. /**
    6. * DelayQueueLearn
    7. *
    8. * @author wsjz
    9. * @date 2023/09/22
    10. */
    11. public class DelayQueueLearn {
    12. public static void main(String[] args) throws InterruptedException {
    13. DelayQueue delayQueue = new DelayQueue<>();
    14. //当前时间加5秒为开始时间
    15. LocalDateTime localDateTime = LocalDateTime.now().plusSeconds(5);
    16. long start = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    17. System.out.println(start);
    18. Order order = new Order(5*1000, start);
    19. System.out.println(LocalDateTime.now());
    20. delayQueue.add(order);
    21. Order order1 = delayQueue.take();
    22. System.out.println(LocalDateTime.now());
    23. System.out.println(order1);
    24. }
    25. }

    测试运行

    一共延时 10 秒钟

    2、延时功能源码分析

    DelayQueue 是基于 PriorityQueue(优先队列)实现的,PriorityQueue 默认是最小堆结构

    我们先看 add 添加方法

    1. /**
    2. * Inserts the specified element into this delay queue.
    3. *
    4. * @param e the element to add
    5. * @return {@code true} (as specified by {@link Collection#add})
    6. * @throws NullPointerException if the specified element is null
    7. */
    8. public boolean add(E e) {
    9. return offer(e);
    10. }

    offer 方法

    1. /**
    2. * Inserts the specified element into this delay queue.
    3. *
    4. * @param e the element to add
    5. * @return {@code true}
    6. * @throws NullPointerException if the specified element is null
    7. */
    8. public boolean offer(E e) {
    9. final ReentrantLock lock = this.lock;
    10. lock.lock();
    11. try {
    12. q.offer(e);
    13. if (q.peek() == e) {
    14. leader = null;
    15. available.signal();
    16. }
    17. return true;
    18. } finally {
    19. lock.unlock();
    20. }
    21. }

    先获取锁,然后调用 PriorityQueue 的 offer 方法,如果此时 PriorityQueue 的头部元素是新添加的元素,则 leader = null,并唤醒等待线程;否则直接返回 true

    因为这里的 PriorityQueue 是最小堆结构,所以它能保证延时时间最小的元素最先出队(添加进去的元素 Order 对象实现了 compareTo 方法)

    PriorityQueue 的 offer 方法

    1. /**
    2. * Inserts the specified element into this priority queue.
    3. *
    4. * @return {@code true} (as specified by {@link Queue#offer})
    5. * @throws ClassCastException if the specified element cannot be
    6. * compared with elements currently in this priority queue
    7. * according to the priority queue's ordering
    8. * @throws NullPointerException if the specified element is null
    9. */
    10. public boolean offer(E e) {
    11. if (e == null)
    12. throw new NullPointerException();
    13. modCount++;
    14. int i = size;
    15. if (i >= queue.length)
    16. grow(i + 1);
    17. size = i + 1;
    18. if (i == 0)
    19. queue[0] = e;
    20. else
    21. siftUp(i, e);
    22. return true;
    23. }

    如果超出容量的话,调用 grow 方法扩容

    如果是首次添加的话放在数组索引是0的首位

    如果队列中有元素的话,调用 siftUp 方法添加

    grow 方法

    PriorityQueue 基于数组实现

    1. /**
    2. * Increases the capacity of the array.
    3. *
    4. * @param minCapacity the desired minimum capacity
    5. */
    6. private void grow(int minCapacity) {
    7. int oldCapacity = queue.length;
    8. // Double size if small; else grow by 50%
    9. int newCapacity = oldCapacity + ((oldCapacity < 64) ?
    10. (oldCapacity + 2) :
    11. (oldCapacity >> 1));
    12. // overflow-conscious code
    13. if (newCapacity - MAX_ARRAY_SIZE > 0)
    14. newCapacity = hugeCapacity(minCapacity);
    15. queue = Arrays.copyOf(queue, newCapacity);
    16. }

    siftUp 方法

    1. /**
    2. * Inserts item x at position k, maintaining heap invariant by
    3. * promoting x up the tree until it is greater than or equal to
    4. * its parent, or is the root.
    5. *
    6. * To simplify and speed up coercions and comparisons. the
    7. * Comparable and Comparator versions are separated into different
    8. * methods that are otherwise identical. (Similarly for siftDown.)
    9. *
    10. * @param k the position to fill
    11. * @param x the item to insert
    12. */
    13. private void siftUp(int k, E x) {
    14. if (comparator != null)
    15. siftUpUsingComparator(k, x);
    16. else
    17. siftUpComparable(k, x);
    18. }

    默认 comparator 是 null,调用 siftUpComparable 方法

    1. private void siftUpComparable(int k, E x) {
    2. Comparablesuper E> key = (Comparablesuper E>) x;
    3. while (k > 0) {
    4. int parent = (k - 1) >>> 1;
    5. Object e = queue[parent];
    6. if (key.compareTo((E) e) >= 0)
    7. break;
    8. queue[k] = e;
    9. k = parent;
    10. }
    11. queue[k] = key;
    12. }

    siftUpComparable 方法会进行比较,保证延时时间最小的元素在最上面

    然后我们直接看 take 方法

    1. /**
    2. * Retrieves and removes the head of this queue, waiting if necessary
    3. * until an element with an expired delay is available on this queue.
    4. *
    5. * @return the head of this queue
    6. * @throws InterruptedException {@inheritDoc}
    7. */
    8. public E take() throws InterruptedException {
    9. final ReentrantLock lock = this.lock;
    10. lock.lockInterruptibly();
    11. try {
    12. for (;;) {
    13. //队列头部元素
    14. E first = q.peek();
    15. if (first == null)
    16. //如果头部元素是null 则让当前线程等待
    17. available.await();
    18. else {
    19. //头部元素不为空,获取延时时间
    20. long delay = first.getDelay(NANOSECONDS);
    21. if (delay <= 0)
    22. //延时时间小于等于0,出队返回
    23. return q.poll();
    24. first = null; // don't retain ref while waiting
    25. if (leader != null)
    26. available.await();
    27. else {
    28. Thread thisThread = Thread.currentThread();
    29. leader = thisThread;
    30. try {
    31. //让线程等待延时时间
    32. available.awaitNanos(delay);
    33. } finally {
    34. if (leader == thisThread)
    35. leader = null;
    36. }
    37. }
    38. }
    39. }
    40. } finally {
    41. if (leader == null && q.peek() != null)
    42. available.signal();
    43. lock.unlock();
    44. }
    45. }

    相关说明在代码注释中,先看队列头部元素是不是null,如果是说明当前队列为空,让线程等待;如果不为空,看头部元素延时时间,如果延时时间小于等于0,则出队返回,leader 默认是null,因此线程等待延时时间的时长,等待时间到达后,重新开始循环,此时延时时间小于等于0,出队返回,达到延时效果

    关于leader 的分析,leader 这里使用了 Leader-Follower 模式的变体

    1. /**
    2. * Thread designated to wait for the element at the head of
    3. * the queue. This variant of the Leader-Follower pattern
    4. * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
    5. * minimize unnecessary timed waiting. When a thread becomes
    6. * the leader, it waits only for the next delay to elapse, but
    7. * other threads await indefinitely. The leader thread must
    8. * signal some other thread before returning from take() or
    9. * poll(...), unless some other thread becomes leader in the
    10. * interim. Whenever the head of the queue is replaced with
    11. * an element with an earlier expiration time, the leader
    12. * field is invalidated by being reset to null, and some
    13. * waiting thread, but not necessarily the current leader, is
    14. * signalled. So waiting threads must be prepared to acquire
    15. * and lose leadership while waiting.
    16. */
    17. private Thread leader = null;

    假设没有 leader,现在有2个线程,线程A 和线程B,线程A 和线程B 都会执行 available.awaitNanos(delay) 进行等待,等待时间结束后,线程A 和线程B中只有一个能拿到元素返回,另外一个将重新等待,对于没拿到元素的线程来说一开始等待,之后等待结束被唤醒,最后再次等待,是一种资源浪费,不如一开始就让它一直等待(如果它不是leader的话)

    leader 更详细的分析:https://stackoverflow.com/questions/48493830/what-exactly-is-the-leader-used-for-in-delayqueue

    3、总结

    DelayQueue 内部基于优先队列 PriorityQueue(最小堆结构)实现延时时间小的元素总是先出队。延时功能是通过循环加线程等待的方式实现的,先判断 PriorityQueue 中延时时间最小的元素的延时时间是否小于等于0,如果是则直接出队返回;否则让线程等待延时的时长,等待结束后,开始新一轮循环,这时延时时间肯定是小于等于0的,出队返回,达到延时的效果

    至此完

  • 相关阅读:
    常坐飞机的你,为什么老惦记着“升舱”?
    SpringBoot+Redis BitMap 实现签到与统计功能
    ABP中的数据过滤器
    Vue中的异步组件
    C++pimer第2章基础
    基于ReadWriteLock的全局安全缓存实现
    模拟退火算法(SA)求解旅行商问题(TSP)python
    PDF文件如何设置密码保护?
    油罐清洗抽吸系统设计
    Linux磁盘分区中物理卷(PV)、卷组(VG)、逻辑卷(LV)创建和(LVM)管理
  • 原文地址:https://blog.csdn.net/wsjzzcbq/article/details/133182414