• DelayQueue源码分析


    底层

    DelayQueue 是 JUC 包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单 15 分钟未支付直接取消。它是 BlockingQueue 的一种,底层是一个基于 PriorityQueue 实现的一个无界队列,是线程安全的。默认情况下, DelayQueue 会按照到期时间升序编排任务。只有当元素过期时(getDelay()方法返回值小于等于0),才能从队列中取出。

    DelayQueue 常见使用场景示例

    我们这里希望任务可以按照我们预期的时间执行,例如提交 3 个任务,分别要求 1s、2s、3s 后执行,即使是乱序添加,1s 后要求 1s 执行的任务会准时执行。
    对此我们可以使用 DelayQueue 来实现,所以我们首先需要继承 Delayed 实现 DelayedTask,实现 getDelay 方法以及优先级比较 compareTo。

    /**
     * 延迟任务
     */
    public class DelayedTask implements Delayed {
        /**
         * 任务到期时间
         */
        private long executeTime;
        /**
         * 任务
         */
        private Runnable task;
    
        public DelayedTask(long delay, Runnable task) {
            this.executeTime = System.currentTimeMillis() + delay;
            this.task = task;
        }
    
        /**
         * 查看当前任务还有多久到期
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        /**
         * 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
         * @param o
         * @return
         */
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
        }
    
        public void execute() {
            task.run();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    完成任务的封装之后,使用就很简单了,设置好多久到期然后将任务提交到延迟队列中即可。

    // 创建延迟队列,并添加任务
    DelayQueue < DelayedTask > delayQueue = new DelayQueue < > ();
    
    //分别添加1s、2s、3s到期的任务
    delayQueue.add(new DelayedTask(2000, () -> System.out.println("Task 2")));
    delayQueue.add(new DelayedTask(1000, () -> System.out.println("Task 1")));
    delayQueue.add(new DelayedTask(3000, () -> System.out.println("Task 3")));
    
    // 取出任务并执行
    while (!delayQueue.isEmpty()) {
      //阻塞获取最先到期的任务
      DelayedTask task = delayQueue.take();
      if (task != null) {
        task.execute();
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    结构

    在这里插入图片描述

    四个核心成员变量

    //可重入锁,实现线程安全的关键
    private final transient ReentrantLock lock = new ReentrantLock();
    //延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    
    //指向准备执行优先级最高的线程
    private Thread leader = null;
    //实现多线程之间等待唤醒的交互
    private final Condition available = lock.newCondition();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    lock : 我们都知道 DelayQueue 存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而 DelayQueue 就是基于 ReentrantLock 独占锁确保存取操作的线程安全。
    q : 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以 DelayQueue 底层元素的存取都是通过这个优先队列 PriorityQueue 的成员变量 q 来管理的。
    leader : 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用 leader 来管理延迟任务,只有 leader 所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到 leader 线程执行完手头的延迟任务后唤醒它。
    available : 上文讲述 leader 线程时提到的等待唤醒操作的交互就是通过 available 实现的,假如线程 1 尝试在空的 DelayQueue 获取任务时,available 就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过 available 的 signal 方法将其唤醒

    构造方法

    相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。

    添加元素

    public boolean offer(E e) {
        //尝试获取lock
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果上锁成功,则调q的offer方法将元素存放到优先队列中
            q.offer(e);
            //调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
            if (q.peek() == e) {
                //将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            //上述步骤执行完成,释放lock
            lock.unlock();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 尝试获取 lock 。
    • 如果上锁成功,则调 q 的 offer 方法将元素存放到优先队列中。
    • 调用 peek 方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将 leader 设置为空,通知因为队列为空时调用 take 等方法导致阻塞的线程来争抢元素。
    • 上述步骤执行完成,释放 lock。
    • 返回 true。

    获取元素

    DelayQueue 中获取元素的方式分为阻塞式和非阻塞式

    public E take() throws InterruptedException {
        // 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //查看队列第一个元素
                E first = q.peek();
                //若为空,则将当前线程放入ConditionObject的等待队列中,并将底层AQS的state设置为0,表示释放锁并进入无限期等待
                if (first == null)
                    available.await();
                else {
                    //若元素不为空,则查看当前元素多久到期
                    long delay = first.getDelay(NANOSECONDS);
                    //如果小于0则说明已到期直接返回出去
                    if (delay <= 0)
                        return q.poll();
                    //如果大于0则说明任务还没到期,首先需要释放对这个元素的引用
                    first = null; // don't retain ref while waiting
                    //判断leader是否为空,如果不为空,则说明正有线程作为leader并等待一个任务到期,则当前线程进入无限期等待
                    if (leader != null)
                        available.await();
                    else {
                        //反之将我们的线程成为leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //并进入有限期等待
                            available.awaitNanos(delay);
                        } finally {
                            //等待任务到期时,释放leader引用,进入下一次循环将任务return出去
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //收尾逻辑:如果leader不为空且q有元素,则说明有任务没人认领,直接发起通知唤醒因为锁被当前消费者持有而导致阻塞的生产者(即调用put、add、offer的线程)
            if (leader == null && q.peek() != null)
                available.signal();
            //释放锁
            lock.unlock();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    public E poll() {
        //尝试获取可重入锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //查看队列第一个元素,判断元素是否为空
            E first = q.peek();
    
            //若元素为空,或者元素未到期,则直接返回空
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                //若元素不为空且到期了,直接调用poll返回出去
                return q.poll();
        } finally {
            //释放可重入锁lock
            lock.unlock();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    查看元素

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    DelayQueue 的实现原理是什么?

    DelayQueue 底层是使用优先队列 PriorityQueue 来存储元素,而 PriorityQueue 采用二叉小顶堆的思想确保值小的元素排在最前面,这就使得 DelayQueue 对于延迟任务优先级的管理就变得十分方便了。同时 DelayQueue 为了保证线程安全还用到了可重入锁 ReentrantLock,确保单位时间内只有一个线程可以操作延迟队列。最后,为了实现多线程之间等待和唤醒的交互效率,DelayQueue 还用到了 Condition,通过 Condition 的 await 和 signal 方法完成多线程之间的等待唤醒。

    DelayQueue 的实现是否线程安全?

    DelayQueue 的实现是线程安全的,它通过 ReentrantLock 实现了互斥访问和 Condition 实现了线程间的等待和唤醒操作,可以保证多线程环境下的安全性和可靠性。

    DelayQueue 的使用场景有哪些?

    DelayQueue 通常用于实现定时任务调度和缓存过期删除等场景。在定时任务调度中,需要将需要执行的任务封装成延迟任务对象,并将其添加到 DelayQueue 中,DelayQueue 会自动按照剩余延迟时间进行升序排序(默认情况),以保证任务能够按照时间先后顺序执行。对于缓存过期这个场景而言,在数据被缓存到内存之后,我们可以将缓存的 key 封装成一个延迟的删除任务,并将其添加到 DelayQueue 中,当数据过期时,拿到这个任务的 key,将这个 key 从内存中移除

    DelayQueue 中 Delayed 接口的作用是什么?

    Delayed 接口定义了元素的剩余延迟时间(getDelay)和元素之间的比较规则(该接口继承了 Comparable 接口)。若希望元素能够存放到 DelayQueue 中,就必须实现 Delayed 接口的 getDelay() 方法和 compareTo() 方法,否则 DelayQueue 无法得知当前任务剩余时长和任务优先级的比较。

    DelayQueue 和 Timer/TimerTask 的区别是什么?

    DelayQueue 和 Timer/TimerTask 都可以用于实现定时任务调度,但是它们的实现方式不同。DelayQueue 是基于优先级队列和堆排序算法实现的,可以实现多个任务按照时间先后顺序执行;而 Timer/TimerTask 是基于单线程实现的,只能按照任务的执行顺序依次执行,如果某个任务执行时间过长,会影响其他任务的执行。另外,DelayQueue 还支持动态添加和移除任务,而 Timer/TimerTask 只能在创建时指定任务。

    作者声明

    如有问题,欢迎指正!
    
    • 1
  • 相关阅读:
    AI智能监控平台EasyCVR+无人机方案:实时全景无死角全方面助力山区安防系统新升级
    Qml 实现星级评分组件 已发布
    Python入门自学进阶-Web框架——41、初步了解Celery 分布式队列、识堡垒机、自动发布、配置管理系统
    基于CNN的动物识别系统[完整实战源码]
    Microsoft Remote Desktop Mac
    Ajax笔记
    RustGUI学习(iced)之小部件(二):如何使用滑动条部件
    格式字幕格式脚本
    进阶JAVA篇- Lambda 表达式与 Lambda 表达式的省略规则
    直接安装WSL2及安装Ubuntu到F盘
  • 原文地址:https://blog.csdn.net/weixin_45247019/article/details/132873952