• 【Lilishop商城】No2-7.确定软件架构搭建六(本篇包括延时任务,会用到rocketmq、redis)


      仅涉及后端,全部目录看顶部专栏,代码、文档、接口路径在:

    【Lilishop商城】记录一下B2B2C商城系统学习笔记~_清晨敲代码的博客-CSDN博客


     全篇只介绍重点架构逻辑,具体编写看源代码就行,读起来也不复杂~

    谨慎:源代码中有一些注释是错误的,有的注释意思完全相反,有的注释对不上号,我在阅读过程中就顺手更新了,并且在我不会的地方添加了新的注释,所以在读源代码过程中一定要谨慎啊!

    目录

    A1.延时任务模块

     B1.延时任务模块的逻辑

    PS:怕忘记的内容并且也是重点

     B2.延时任务执行模块基本搭建

    C1.业务系统的延时任务模块搭建(产生延时任务)

    C2.消费系统的延时任务模块搭建(执行延时任务)

    B2.测试

    C1.测试产生延时任务

    C2.测试执行延时任务

    剩余内容:暂时没有了


    A1.延时任务模块

    延时任务的需求,例如:

    1、生成订单30分钟未支付,则自动取消订单
    2、快递签收后未点击订单的确认收货,则7天后默认确认收货

    定时任务的区别在于:

    1、定时任务有明确的触发时间,延时任务没有;
    2、定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期;
    3、定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

    说明:shop系统中使用的是多个技术框架结合实现的延时逻辑(redis+rocketmq),并没有使用专门的延时框架。我在网上搜索延时框架也没有找到合适的,几乎都是在现有技术框架基础上搭建的。

    其他延时实现方式可看这篇:你可见过如此细致的延时任务详解_Java程序V的博客-CSDN博客

    【rocketmq本身也支持延时消费哦,但延迟时长不支持随意时长的延迟【RocketMQ 二十】RocketMQ应用之延时消息】 

     B1.延时任务模块的逻辑

    我们来说一下shop里面实现的延时模块,逻辑是有点复杂的,用到了redis+rocketmq,redis主要是存储延时任务,rocketmp主要是用来监听任务并执行的。

    如果项目规模中不使用到消息中间件,也可以不用rocketmq。rocketmp在该项目中一部分功能是降低耦合、提高性能(我是这样理解的)。


    我一开始看帮助文档的延时框架介绍:延时任务架构 · GitBook

    里面的介绍比较简单,就对重点的类进行了说明,有点不太容易理解,我就又重新画了一个类关系图,格式不是很严谨(图3)...然后发现根据类名不容易理解,于是就用文字描述了一版(图2)...最终,为了更容易理解又画了一个简版逻辑(图1)

    (图1)--> (图2)--> (图3),当然可以直接看图3源码,就是源码的类名字有点混淆了(对于我来说是混淆,也可能是我的问题...)

     

    上三图解释的就已经很简单了,流程就不再文字重复了,下面记录一下我怕忘记的内容并且也是重点:

    PS:怕忘记的内容并且也是重点

    1.框架中是将同一类延时任务放到了同一个队列里面,等待执行,所以队列监听工厂类是获取的任务列表~所以本质上还是拿到一批任务进行执行。

    2.框架中的队列监听工厂类是每秒轮询,不断获取在当秒执行的任务列表,然后开启异步线程执行;

    3.因为延时任务类型不只一种,所以在执行框架中涉及到的核心类(除延时任务触发消费者类外),其余的核心类都需要实现类,每类延时任务各干各的事儿互不影响~

    4.图中,以rocketmq和redis为连线,左半部分是属于业务系统的(在framework模块),右半部分是属于延时/消费系统的(在consumer模块,但也依赖framework,因为消费中需要业务类)。这样系统耦合度就降低了,例如,当consumer模块出故障后,不影响framwork模块添加任务,当consumer模块正常后就会从任务队列中继续获取任务进行执行~

    5.任务类会从创建后保存到redis里面,然后被拿出来后又转化成mq的消息存储到mq里面,最后被mq监听到然后交给执行器执行。执行器里面是具体执行任务的逻辑,而流转的任务类里面的是需要到的参数~

    6.延时执行接口是专门负责管理任务的,本来任务队列的管理也可以交给他,但是他又把延时队列生产工厂抽象出来了,应该是怕耦合。

    7.RocketmqTimerTrigger具体延时任务实现,这个类的名字 RocketmqTimerTrigger 很迷惑,我的理解是促销类型的任务队列实现类,而这个名字范围太大了。如果我理解错了,那这个类里面又是只针对于 PromotionDelayQueue 队列的操作。所以最终我把他定位为促销类型的任务队列实现类,感觉这个理解是对的。

    8.任务的修改逻辑是先删除任务然后又新增任务;任务的删除逻辑是先只删除任务的唯一标识,然后在消费端从队列中拿到并清除任务的消息时会判断任务唯一标识是否存在,存在则执行,不存在则跳过,所以并不是直接通过任务是否存在而判断的。


    接下来就开始搭建 

     B2.延时任务执行模块基本搭建

    消费系统的模块我们还在上篇新创建的consumer-test-xxl-job中添加,业务模块我们创建一个新的模块 framework-test 来搭建,这样更清晰。(以后用起来也方便,但要注意由于是我们自己创建的模块,有些工具类、参数类是没有的,到时候会自行添加的~)

    C1.业务系统的延时任务模块搭建(产生延时任务)

    创建新的模块framework-test,因为会用到 redis和rocketmq,我们按照之前文章里的的逻辑搭建就可以啦,在这就不重复了。

    下面看开始搭建业务类的延时任务模块搭建:

    1.延时任务消息类,专门存放任务信息;添加具体任务信息;

    2.延时队列生产工厂抽象类,管理延时任务队列模块,向队列增加任务;管理延时任务队列模块实现类

    3.延时执行接口,管理延时执行模块,增删改任务、执行任务;管理延时执行模块的实现类;

    4.会使用到枚举类、常量接口、util工具类;

    5.yml文件配置,添加rocketmq主题类型;(记得修改本模块pom类)

    1. // 1.延时任务消息类,专门存放任务信息;添加具体任务信息;
    2. //详见:cn.lili.trigger.model.TimeTriggerMsg
    3. @Data
    4. @AllArgsConstructor
    5. @NoArgsConstructor
    6. public class TimeTriggerMsg implements Serializable {
    7. private static final long serialVersionUID = 8897917127201859535L;
    8. /**
    9. * 执行器beanId,用于获取对应执行器对象
    10. */
    11. private String triggerExecutor;
    12. /**
    13. * 执行时间
    14. */
    15. private Long triggerTime;
    16. /**
    17. * 执行器参数,就是任务内容
    18. */
    19. private Object param;
    20. /**
    21. * 唯一KEY
    22. */
    23. private String uniqueKey;
    24. /**
    25. * 信息队列主题,用与mq发送和接收消息
    26. */
    27. private String topic;
    28. }
    29. /**
    30. * @author QingChen
    31. * @Description 具体任务信息,一般是所需要的参数属性,这里就随便测试了
    32. * @date 2022-12-02 15:43
    33. * @Version 1.0
    34. */
    35. @Data
    36. @NoArgsConstructor
    37. public class Test1Message {
    38. /**
    39. * name
    40. */
    41. private String name;
    42. /**
    43. * id
    44. */
    45. private String id;
    46. /**
    47. * 开始时间
    48. */
    49. private Date startTime;
    50. /**
    51. * 结束时间
    52. */
    53. private Date endTime;
    54. }
    1. //2.延时队列生产工厂抽象类,管理延时任务队列模块,向队列增加任务;管理延时任务队列模块实现类
    2. //抽象类,详见:cn.lili.trigger.delay.AbstractDelayQueueMachineFactory
    3. //实现类,详见:cn.lili.trigger.delay.queue.Test1DelayQueue
    4. @Slf4j
    5. public abstract class AbstractDelayQueueMachineFactory {
    6. @Autowired
    7. private Cache cache;
    8. /**
    9. * 插入任务到redis里面,记住 jodid 是任务对象json转string
    10. *
    11. * @param jobId 任务id(队列内唯一)
    12. * @param triggerTime 执行时间 时间戳(毫秒)
    13. * @return 是否插入成功
    14. */
    15. public boolean addJob(String jobId, Long triggerTime) {
    16. /**
    17. * 设置在 redis 中的排序 score,设置为任务执行时间/1000(也就是秒级别的,别忘了在延时队列里面也/1000)
    18. * 因为延时任务按照疫每秒轮询判断,所以队列的 score 按照秒级别存放,方便获取
    19. */
    20. long delaySeconds = triggerTime / 1000;
    21. //增加延时任务到 ZSet 里面,参数依次为:队列名称、执行时间score、任务id
    22. boolean result = cache.zAdd(this.setDelayQueueName(), delaySeconds, jobId);
    23. log.info("增加延时任务, 缓存key {}, 执行时间 {},任务id {}", setDelayQueueName(), DateUtil.toString(triggerTime), jobId);
    24. return result;
    25. }
    26. /**
    27. * 要实现延时队列的名字,实现类需要实现
    28. * @return 延时队列的名字
    29. */
    30. public abstract String setDelayQueueName();
    31. }
    32. /**
    33. * @author QingChen
    34. * @Description 管理延时任务队列模块实现类
    35. * @date 2022-12-02 17:10
    36. * @Version 1.0
    37. */
    38. @Component
    39. public class Test1DelayQueue extends AbstractDelayQueueMachineFactory {
    40. @Override
    41. public String setDelayQueueName() {
    42. return DelayQueueEnums.TEST_1_DELAYQUEUE.name();
    43. }
    44. }
    1. //3.延时执行接口,管理延时执行模块,增删改任务、执行任务;管理延时执行模块的实现类;
    2. //接口类,详见:cn.lili.trigger.interfaces.TimeTrigger
    3. //实现类,详见:cn.lili.trigger.interfaces.impl.Test1RocketmqTimerTrigger
    4. public interface TimeTrigger {
    5. /**
    6. * 添加延时任务
    7. *
    8. * @param timeTriggerMsg 延时任务信息
    9. */
    10. void addDelay(TimeTriggerMsg timeTriggerMsg);
    11. /**
    12. * 执行延时任务
    13. *
    14. * @param timeTriggerMsg 延时任务信息
    15. */
    16. void execute(TimeTriggerMsg timeTriggerMsg);
    17. /**
    18. * 修改延时任务
    19. *
    20. * @param executorName 执行器beanId
    21. * @param param 执行参数
    22. * @param triggerTime 执行时间 时间戳 秒为单位
    23. * @param oldTriggerTime 旧的任务执行时间
    24. * @param uniqueKey 添加任务时的唯一凭证
    25. * @param delayTime 延时时间(秒)
    26. * @param topic rocketmq topic
    27. */
    28. void edit(String executorName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey, int delayTime, String topic);
    29. /**
    30. * 删除延时任务
    31. *
    32. * @param executorName 执行器
    33. * @param triggerTime 执行时间
    34. * @param uniqueKey 添加任务时的唯一凭证
    35. * @param topic rocketmq topic
    36. */
    37. void delete(String executorName, Long triggerTime, String uniqueKey, String topic);
    38. }
    39. @Component
    40. @Slf4j
    41. public class Test1RocketmqTimerTrigger implements TimeTrigger {
    42. @Autowired
    43. private RocketMQTemplate rocketMQTemplate;
    44. @Autowired
    45. private Cache cache;
    46. @Autowired
    47. private Test1DelayQueue delayQueue;
    48. @Override
    49. public void addDelay(TimeTriggerMsg timeTriggerMsg) {
    50. //拿到执行器唯一key
    51. String uniqueKey = timeTriggerMsg.getUniqueKey();
    52. if (StringUtils.isEmpty(uniqueKey)) {
    53. uniqueKey = StringUtils.getRandStr(10);
    54. }
    55. //生成执行任务key
    56. String generateKey = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), uniqueKey);
    57. //在redis中添加一个唯一标识的来标识当前延时任务的唯一性,该标识根据任务key生成的
    58. this.cache.put(generateKey, 1);
    59. //设置延时任务,注意哦,这里将延时消息本身(json)作为了 jobid
    60. if (Boolean.TRUE.equals(delayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), timeTriggerMsg.getTriggerTime()))) {
    61. log.info("延时任务标识: {}", generateKey);
    62. log.info("定时执行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消费【" + timeTriggerMsg.getParam().toString() + "】");
    63. } else {
    64. log.error("延时任务添加失败:{}", timeTriggerMsg);
    65. }
    66. }
    67. @Override
    68. public void execute(TimeTriggerMsg timeTriggerMsg) {
    69. this.addExecute(timeTriggerMsg.getTriggerExecutor(),
    70. timeTriggerMsg.getParam(),
    71. timeTriggerMsg.getTriggerTime(),
    72. timeTriggerMsg.getUniqueKey(),
    73. timeTriggerMsg.getTopic()
    74. );
    75. }
    76. /**
    77. * 将任务添加到mq,mq异步队列执行。
    78. *

    79. * 本系统中redis相当于延时任务吊起机制,而mq才是实际的业务消费,执行任务的存在
    80. *
    81. * @param executorName 执行器beanId
    82. * @param param 执行参数
    83. * @param triggerTime 执行时间 时间戳 秒为单位
    84. * @param uniqueKey 如果是一个 需要有 修改/取消 延时任务功能的延时任务,
    85. * 请填写此参数,作为后续删除,修改做为唯一凭证
    86. * 建议参数为:COUPON_{ACTIVITY_ID} 例如 coupon_123
    87. * 业务内全局唯一
    88. * @param topic rocketmq topic
    89. */
    90. private void addExecute(String executorName, Object param, Long triggerTime, String uniqueKey, String topic) {
    91. TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic);
    92. Message message = MessageBuilder.withPayload(timeTriggerMsg).build();
    93. log.info("延时任务发送信息:{}", message);
    94. this.rocketMQTemplate.asyncSend(topic, message, RocketmqSendCallbackBuilder.commonCallback());
    95. }
    96. @Override
    97. public void edit(String executorName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey, int delayTime, String topic) {
    98. this.delete(executorName, oldTriggerTime, uniqueKey, topic);
    99. this.addDelay(new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic));
    100. }
    101. @Override
    102. public void delete(String executorName, Long triggerTime, String uniqueKey, String topic) {
    103. String generateKey = DelayQueueTools.generateKey(executorName, triggerTime, uniqueKey);
    104. log.info("删除延时任务{}", generateKey);
    105. this.cache.remove(generateKey);
    106. }
    107. }
    1. //4.会使用到枚举类、常量接口、util工具类
    2. //都有:
    3. 延时任务工具类,专门生成key:cn.lili.trigger.util.DelayQueueTools
    4. 延时任务执行器常量:cn.lili.trigger.model.TimeExecuteConstant
    5. 队列枚举:cn.lili.trigger.enums.DelayQueueEnums
    6. 延时任务类型:cn.lili.trigger.enums.DelayTypeEnums
    7. cn.lili.util.DateUtil
    8. cn.lili.util.StringUtils
    1. //5.yml文件配置,添加rocketmq主题类型;
    2. //详见:/lilishop-master/framework-test/src/main/resources/application.yml
    3. lili:
    4. data:
    5. rocketmq:
    6. test1Topic: lili_test1Topic
    7. test1Group: lili_test1Group

    C2.消费系统的延时任务模块搭建(执行延时任务)

    在上次创建的consumer-test-xxl-job模块中,搭建延时任务框架,由于要搭配新创建的framework模块使用,所以需要将 pom 文件中的framework依赖注释掉,并且引入framework-test模块;

    下面看开始搭建消费类的延时任务模块搭建

    1.延时队列工厂监听抽象类,延时队列工厂监听实现类;

    2.延时任务事件消息触发消费者;

    3.延时任务执行器接口,延时任务执行器实现类;

    4.会使用到枚举类、常量接口、util工具类;(记得修改本模块pom类)

    1. //1.延时队列工厂监听抽象类,延时队列工厂监听实现类;
    2. //抽象类,详见:cn.lili.trigger.listen.queue.AbstractDelayQueueListen
    3. //实现类,详见:cn.lili.trigger.listen.queue.impl.Test1DelayQueueListen
    4. /**
    5. * @author QingChen
    6. * @Description 延时队列工厂监听抽象类
    7. * springBoot项目启动时候,有时候需要再启动之后直接执行某一段代码。这个时候就用到了 ApplicationRunner 这个类。
    8. * @date 2022-12-05 11:34
    9. * @Version 1.0
    10. */
    11. @Slf4j
    12. public abstract class AbstractDelayQueueListen implements ApplicationRunner {
    13. @Autowired
    14. private Cache cache;
    15. /**
    16. * 延时队列机器开始运作
    17. */
    18. private void startDelayQueueMachine() {
    19. log.info("延时队列机器{}开始运作", setDelayQueueName());
    20. //监听redis队列
    21. while (true) {
    22. try {
    23. //获取当前时间的时间戳,并拿到秒,与
    24. long now = System.currentTimeMillis() / 1000;
    25. //获取当前监听任务类型中,当前时间前需要执行的任务列表,(score是以时间秒设置的)
    26. //【不会移除任务哦】
    27. Set tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);
    28. //如果任务不为空
    29. if (!CollectionUtils.isEmpty(tuples)) {
    30. log.info("执行任务:{}", JSONUtil.toJsonStr(tuples));
    31. for (DefaultTypedTuple tuple : tuples) {
    32. //循环拿到jobid
    33. String jobId = (String) tuple.getValue();
    34. //移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务
    35. //【在这里移除任务哦】
    36. Long num = cache.zRemove(setDelayQueueName(), jobId);
    37. //如果移除成功, 则执行
    38. if (num > 0) {
    39. //创建新线程并run,run里面执行 invoke 方法
    40. ThreadPoolUtil.execute(() -> {this.invoke(jobId);});
    41. }
    42. }
    43. }
    44. } catch (Exception e) {
    45. log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e);
    46. } finally {
    47. //间隔一秒钟搞一次
    48. try {
    49. TimeUnit.SECONDS.sleep(5L);
    50. } catch (InterruptedException e) {
    51. e.printStackTrace();
    52. }
    53. }
    54. }
    55. }
    56. /**
    57. * 最终执行的任务方法
    58. *
    59. * @param jobId 任务id
    60. */
    61. public abstract void invoke(String jobId);
    62. /**
    63. * 要实现延时队列的执行器名字
    64. * @return
    65. */
    66. public abstract String setDelayQueueName();
    67. /**
    68. * 初始化监听队列
    69. */
    70. public void init() {
    71. ThreadPoolUtil.getPool().execute(this::startDelayQueueMachine);
    72. }
    73. }
    74. @Component
    75. public class Test1DelayQueueListen extends AbstractDelayQueueListen {
    76. @Autowired
    77. private Test1RocketmqTimerTrigger timeTrigger;
    78. /**
    79. * @Description: 调用具体的 TimeTrigger 执行任务的方法
    80. * @param: [jobId]
    81. * @return: void
    82. **/
    83. @Override
    84. public void invoke(String jobId) {
    85. //Json转对象
    86. timeTrigger.execute(JSONUtil.toBean(jobId, TimeTriggerMsg.class));
    87. }
    88. /**
    89. * 要实现延时队列的名字
    90. * @return 促销延时队列名称
    91. */
    92. @Override
    93. public String setDelayQueueName() {
    94. return DelayQueueEnums.TEST_1_DELAYQUEUE.name();
    95. }
    96. /**
    97. * @Description: ApplicationRunner 的重写方法
    98. * @param: [args]
    99. * @return: void
    100. **/
    101. @Override
    102. public void run(ApplicationArguments args) throws Exception {
    103. this.init();
    104. }
    105. }
    1. //2.延时任务事件消息触发消费者;
    2. //详见:cn.lili.trigger.listen.trigger.TimeTriggerConsumer
    3. /**
    4. * @author QingChen
    5. * @Description 延时任务事件消息触发消费者
    6. * 从rocketmq拦截的topic类型来讲,这个是只针对指定主题promotion-topic的消息监听器。
    7. * 但是由于延时任务消息的执行器只跟 TimeTriggerMsg 的属性有关,所以这个消费类可以作为所有延时任务消息的监听器。
    8. * (由于这个监听的 topic 指定了促销类型,所以我不确定系统开发者的逻辑,而且这个类的名字看起来也是总得消费类,所以比较迷惑)
    9. * (如果是我的话,会将这个作为所有延时任务的消费中心)
    10. *
    11. * @date 2022-12-05 13:32
    12. * @Version 1.0
    13. */
    14. @Component
    15. @Slf4j
    16. @RocketMQMessageListener(topic = "${lili.data.rocketmq.test1Topic}", consumerGroup = "${lili.data.rocketmq.test1Group}")
    17. public class TimeTriggerConsumer implements RocketMQListener {
    18. @Autowired
    19. private Cache cache;
    20. @Override
    21. public void onMessage(TimeTriggerMsg timeTriggerMsg) {
    22. try {
    23. //生成执行任务key
    24. String key = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
    25. if (cache.get(key) == null) {
    26. log.info("执行器执行被取消:{} | 任务标识:{}", timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getUniqueKey());
    27. return;
    28. }
    29. log.info("执行器执行:" + timeTriggerMsg.getTriggerExecutor());
    30. log.info("执行器参数:" + JSONUtil.toJsonStr(timeTriggerMsg.getParam()));
    31. cache.remove(key);
    32. //拿到任务所指定执行器的bean进行执行
    33. TimeTriggerExecutor executor = (TimeTriggerExecutor) SpringContextUtil.getBean(timeTriggerMsg.getTriggerExecutor());
    34. executor.execute(timeTriggerMsg.getParam());
    35. } catch (Exception e) {
    36. log.error("mq延时任务异常", e);
    37. }
    38. }
    39. }
    1. //3.延时任务执行器接口,延时任务执行器实现类;
    2. //接口类,详见:cn.lili.trigger.executor.TimeTriggerExecutor
    3. //实现类,详见:Test1TimeTriggerExecutor
    4. /**
    5. * @author QingChen
    6. * @Description 延时任务执行器接口
    7. * @date 2022-12-05 13:37
    8. * @Version 1.0
    9. */
    10. public interface TimeTriggerExecutor {
    11. /**
    12. * 执行任务
    13. *
    14. * @param object 任务参数
    15. */
    16. void execute(Object object);
    17. }
    18. /**
    19. * @author QingChen
    20. * @Description 延时任务执行器实现类
    21. * @date 2022-12-05 13:38
    22. * @Version 1.0
    23. */
    24. @Slf4j
    25. @Component(TimeExecuteConstant.TEST_1_EXECUTOR)
    26. public class Test1TimeTriggerExecutor implements TimeTriggerExecutor {
    27. @Override
    28. public void execute(Object object) {
    29. Test1Message message = JSONUtil.toBean(JSONUtil.parseObj(object), Test1Message.class);
    30. log.info("执行器:{"+TimeExecuteConstant.TEST_1_EXECUTOR+"}");
    31. log.info("执行内容:{"+message+"}");
    32. }
    33. }
    1. //4.会使用到枚举类、常量接口、util工具类;
    2. cn.lili.util.SpringContextUtil
    3. cn.lili.util.ThreadPoolUtil

    注意,这个模块只是我用来测试的,里面没有任何和shop系统业务相关的,最终的目录是这样的:

    B2.测试

    C1.测试产生延时任务

    我们直接就在framework-test模块添加controller接口,来创建任务,一个是创建任务接口,通过入参的延时任务类型,来判断具体调用哪个任务调度器添加任务。

    生产代码的时候,记得要清楚的添加各个类型哦~

    1. //我这里就添加了两个接口,一个添加任务,一个删除任务
    2. 详见:cn.lili.controller.TestTriggerController
    3. @RestController
    4. @RequestMapping("/trigger")
    5. public class TestTriggerController {
    6. /**
    7. * 延时任务1
    8. */
    9. @Autowired
    10. private Test1RocketmqTimerTrigger test1RocketmqTimerTrigger;
    11. /**
    12. * 延时任务2
    13. */
    14. @Autowired
    15. private Test2RocketmqTimerTrigger test2RocketmqTimerTrigger;
    16. /**
    17. * rocketMq配置
    18. */
    19. @Autowired
    20. private RocketmqCustomProperties rocketmqCustomProperties;
    21. /**
    22. * @Description: type 是指延时任务类型,正式开发时是不会这样用的哦,我仅仅是测试
    23. * @param: [type]
    24. * @return: java.lang.String
    25. **/
    26. @PostMapping(value = "/trigger")
    27. public String addTrigger(String type) {
    28. //获取时间偏移(向前或向后)
    29. long startTime = DateUtil.offsetMinute(new Date(), 2).getTime();
    30. switch (type){
    31. case "1":
    32. Test1Message test1Message = new Test1Message();
    33. test1Message.setId(StringUtils.getRandStr(10));
    34. test1Message.setName("任务name");
    35. test1Message.setStartTime(new Date());
    36. test1Message.setEndTime(new Date());
    37. TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.TEST_1_EXECUTOR,
    38. startTime,
    39. test1Message,
    40. DelayQueueTools.wrapperUniqueKey(DelayTypeEnums.TEST_1_DELAYTYPE.name(), (test1Message.getId())),
    41. rocketmqCustomProperties.getTest1Topic());
    42. test1RocketmqTimerTrigger.addDelay(timeTriggerMsg);
    43. break;
    44. case "2":
    45. Test2Message test2Message = new Test2Message();
    46. test2Message.setNum(StringUtils.getRandStr(10));
    47. test2Message.setSize("大小size");
    48. test2Message.setStartTime(new Date());
    49. timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.TEST_2_EXECUTOR,
    50. startTime,
    51. test2Message,
    52. DelayQueueTools.wrapperUniqueKey(DelayTypeEnums.TEST_2_DELAYTYPE.name(), (test2Message.getNum())),
    53. rocketmqCustomProperties.getTest1Topic());
    54. test2RocketmqTimerTrigger.addDelay(timeTriggerMsg);
    55. break;
    56. default:
    57. break;
    58. }
    59. return "SUCCESSFUL";
    60. }
    61. /**
    62. * @Description: TODO
    63. * @param: executorName 执行器名字, triggerTime 执行时间, idORnum 唯一标识(我将test1 2写一起的), topic 任务主题
    64. * @return: java.lang.String
    65. **/
    66. @DeleteMapping(value = "/trigger")
    67. public String deleteTrigger(String executorName, Long triggerTime, String idORnum, String topic) {
    68. switch (executorName){
    69. case TimeExecuteConstant.TEST_1_EXECUTOR:
    70. test1RocketmqTimerTrigger.delete(executorName,
    71. triggerTime,
    72. DelayQueueTools.wrapperUniqueKey(DelayTypeEnums.TEST_1_DELAYTYPE.name(), idORnum),
    73. rocketmqCustomProperties.getTest1Topic());
    74. break;
    75. case TimeExecuteConstant.TEST_2_EXECUTOR:
    76. test2RocketmqTimerTrigger.delete(executorName,
    77. triggerTime,
    78. DelayQueueTools.wrapperUniqueKey(DelayTypeEnums.TEST_2_DELAYTYPE.name(), idORnum),
    79. rocketmqCustomProperties.getTest1Topic());
    80. break;
    81. default:
    82. break;
    83. }
    84. return "SUCCESSFUL";
    85. }
    86. }

     

    C2.测试执行延时任务

    这个直接运行consumer,搭配上面的C1就是测试了呀,嘿嘿,可以看到就是在执行时间的范围内执行的

     

    剩余内容:暂时没有了

    后面就开始详细设计了,再详细设计中有可能会在涉及到系统架构改动,到时会再度说明。

  • 相关阅读:
    OpenAI 开发者大会 Sam Altman 45分演讲带来哪些干货和狠货
    基于STM32+腾讯云IO+微信小程序设计的混凝土智能养护系统
    SpringBoot整合XXL-JOB详解
    [SCTF 2021]rceme
    【Cherno的OpenGL视频】How to make your uniform faster in OpenGL
    C++算法:给表达式添加运算符
    什么是PolarDB
    RK3568驱动指南|第六篇-平台总线-第55章 初识设备树
    OSPF虚拟链路以及选路
    day13--JDK的安装和环境变量的配置
  • 原文地址:https://blog.csdn.net/vaevaevae233/article/details/128136037