• 延迟任务多种实现姿势--上



    关于延迟任务的所有代码实现均存放在下面这个仓库中:

    https://gitee.com/DaHuYuXiXi/deley-task

    代码仓库中的源码经过后续更新后,对相关实现进行了抽象,因此会与本文展示的代码有稍许不同


    什么是延迟任务

    例如:pdd下单,但是没有付款,那么24小时候,订单会自动取消。收货后,如果一直不进行确认,那么默认七天后自动确认收货等等。

    上面这些场景是我们平日中一直都会遇到的,作为程序员的我们,有没有考虑过该怎么实现这些延迟任务呢?


    一,最简单的延迟队列实现

    DelayQueue是一个无界的BlockingQueue的实现类,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。

    • BlockingQueue即阻塞队列,java提供的面向多线程安全的队列数据结构,当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常。
    • 这里的“无界”队列,是指队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容。

    在这里插入图片描述
    DelayQueue实现了BlockingQueue接口,所以具有无界、阻塞的特点,除此之外它自己的核心特点就是:

    • 放入该队列的延时任务对象,只要到达延时时间之后才能被取到。
    • DelayQueue 不接受null元素。
    • DelayQueue 只接受那些实现了java.util.concurrent.Delayed接口的对象。

    订单延迟任务实现

    package com.delayTask.delayQueue;
    
    
    import com.delayTask.domain.Order;
    import lombok.Data;
    import lombok.ToString;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 延时订单任务
     *
     * @author zdh
     */
    @ToString
    @Data
    public class OrderDelayObject implements Delayed {
    
        /**
         * 延迟任务唯一标识: 这里默认为当前时间戳
         */
        private Long id;
    
        /**
         * 延时时间
         */
        private long delayTime;
    
        /**
         * 订单对象
         */
        private Order order;
    
        public OrderDelayObject(long delayTime, Order order) {
            this.id = System.currentTimeMillis();
            //延时时间加上当前时间
            this.delayTime = System.currentTimeMillis() + delayTime;
            this.order = order;
        }
    
    
        /**
         * 延迟任务是否到期
         */
        @Override
        public long getDelay(TimeUnit unit) {
            long diff = delayTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }
    
        /**
         * 延时任务队列,按照延时时间元素排序,实现Comparable接口
         */
        @Override
        public int compareTo(Delayed obj) {
            return Long.compare(this.delayTime, ((OrderDelayObject) obj).delayTime);
        }
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • Delayed接口继承Comparable接口,所以需要实现compareTo方法,用于延时任务在队列中按照“延时时间”进行排序。
    • getDelay方法是Delayed接口方法,实现该方法提供获取延时任务的倒计时时间

    订单处理

    package com.dhy.delayQueue;
    
    
    import com.delayTask.delayQueue.OrderDelayFactory;
    import com.delayTask.delayQueue.OrderDelayObject;
    import com.delayTask.domain.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.testng.annotations.BeforeTest;
    import org.testng.annotations.Test;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class DelayQueueTest {
        /**
         * 延迟队列
         */
        private final DelayQueue<OrderDelayObject> delayQueue = new DelayQueue<>();
    
        /**
         * 开启线程不断轮询,看是否有延迟任务可以处理
         */
        @BeforeTest
        public void beforeTest() {
            Executors.newSingleThreadExecutor().execute(() -> {
                try {
                    while (true) {
                        //阻塞直到获取到某个到时的延迟任务
                        OrderDelayObject delayObject = delayQueue.take();
                        log.info("延迟任务信息如下: {}",delayObject);
                        Order order = delayObject.getOrder();
                        order.cancelOrderByTimeEnd();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    
        /**
         * 测试下单
         */
        @Test
        public void testOrder() throws InterruptedException {
            OrderDelayObject orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
            delayQueue.add(orderDelay);
    
            OrderDelayObject orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
            delayQueue.add(orderDelay1);
    
            Thread.sleep(TimeUnit.SECONDS.toMillis(8L));
    
            orderDelay.getOrder().submitOrder();
            delayQueue.remove(orderDelay);
    
            //防止程序结束
            Thread.sleep(TimeUnit.MINUTES.toMillis(10L));
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    在这里插入图片描述


    优缺点

    使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

    它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。


    优化点

    上图中我们使用的是while-true循环同步顺序的处理延迟任务:

    在这里插入图片描述
    这里建议将订单处理的业务逻辑放到单独一个线程池中进行处理,而非在这里同步进行处理,因为这样可能会导致部分到期的延迟任务无法得到及时的处理。


    二,上点档次,基于Netty时间轮算法实现

    时间轮算法

    在这里插入图片描述
    时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:

    • 时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。
    • 时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration
    • Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」

    更加详细介绍,可以参考此篇文章


    订单延迟任务实现

    这里商品订单到时取消对时间精确度的要求并不是特别高,因此可以选择采用时间轮算法进行处理。

    首先通过maven坐标引入netty

            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>5.0.0.Alpha2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 对netty时间轮的使用,进行一层简单的封装
    package com.delayTask.wheelTimer;
    
    import com.delayTask.DelayTaskEvent;
    import io.netty.util.HashedWheelTimer;
    import io.netty.util.Timeout;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 
     *     时间轮工厂
     * 
     * @author 大忽悠
     * @create 2022/9/17 17:17
     */
    public class WheelTimerHelper {
        /**
         * 处理订单任务的线程池
         */
        private static final ExecutorService THREAD_POOL= Executors.newCachedThreadPool();
    
        /**
         * 时间轮
         */
        private static HashedWheelTimer wheelTimer;
    
        /**
         * 生产一个时间轮,默认的bucket数量为512个
         */
        public static HashedWheelTimer newWheelTimer(Long duration){
            wheelTimer=new HashedWheelTimer(duration, TimeUnit.MILLISECONDS, 512);
            return wheelTimer;
        }
    
        /**
         * @param delayTaskEvent 延迟任务事件
         */
        public static Timeout addNewTask(DelayTaskEvent delayTaskEvent){
            //延迟任务,延迟时间,时间单位
            return wheelTimer.newTimeout(delayTask -> {
                delayTaskEvent.handleDelayEvent();
            }, delayTaskEvent.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 测试
    package com.dhy.wheelTimer;
    
    import com.delayTask.delayQueue.OrderDelayEvent;
    import com.delayTask.delayQueue.OrderDelayFactory;
    import com.delayTask.wheelTimer.WheelTimerHelper;
    import io.netty.util.Timeout;
    import org.testng.annotations.BeforeTest;
    import org.testng.annotations.Test;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 大忽悠
     * @create 2022/9/17 17:19
     */
    public class WheelTimerTest {
        @BeforeTest
        public void beforeTest(){
            WheelTimerHelper.newWheelTimer(100L);
        }
    
        @Test
        public void testWheelTimer() throws InterruptedException {
            OrderDelayEvent orderDelay =  OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
            OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
            Timeout timeout = WheelTimerHelper.addNewTask(orderDelay);
            Timeout timeout1 = WheelTimerHelper.addNewTask(orderDelay1);
    
            //订单二在到期前成功结算,因此不需要取消
            orderDelay1.getOrder().submitOrder();
            //取消延迟任务二
            timeout1.cancel();
    
            //阻塞,防止程序结束
            Thread.sleep(TimeUnit.SECONDS.toMillis(100L));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    在这里插入图片描述

    详细代码实现,可以fork仓库看源码


    优缺点

    时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务。


    小结

    本文主要对延迟任务基于内存的单体应用实现给出了两种解决策略,下一篇文章中,我们将针对基于内存的单体解决方法缺陷,给出基于redis和mq实现介绍。

  • 相关阅读:
    Java数据结构-二叉树
    Ubuntu 20.04 下编译 Lego Loam 踩过的一些坑
    python案例:百钱买鸡
    Elasticsearch(一):ES简介及其发展历史与ELK
    如何使用ArcGIS Pro提取河网水系
    【活动总结】0730-COC深圳社区AI●CMeetup第4期——畅谈AI+智能制造与机器人的现状与未来
    ChatGPT AIGC 完成超炫酷的大屏可视化
    [Vulnhub] lazysysadmin
    你应该知道的vue3.0优势对比
    声纹技术(七):声纹技术的未来
  • 原文地址:https://blog.csdn.net/m0_53157173/article/details/126902327